Hive中的InputFormat、OutputFormat与SerDe

概览

Hive中,默认使用的是TextInputFormat,一行表示一条记录。在每条记录(一行中),默认使用^A分割各个字段。

在有些时候,我们往往面对多行,结构化的文档,并需要将其导入Hive处理,此时,就需要自定义InputFormat、OutputFormat,以及SerDe了。

首先来理清这三者之间的关系,我们直接引用Hive官方说法:

SerDe is a short name for "Serializer and Deserializer."
Hive uses SerDe (and !FileFormat) to read and write table rows.
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object
Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files

总结一下,当面临一个HDFS上的文件时,Hive将如下处理(以读为例):

(1) 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
(2) 调用SerDe的Deserializer,将一行(Row),切分为各个字段。

当HIVE执行INSERT操作,将Row写入文件时,主要调用OutputFormat、SerDe的Seriliazer,顺序与读取相反。

本文将对InputFormat、OutputFormat、SerDe自定义,使Hive能够与自定义的文档格式进行交互:

<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>

如上所示,每篇文档用分割。文档之中的每行,为key=value的格式。

自定义InputFormat

Hive的InputFormat来源于Hadoop中的对应的部分。需要注意的是,其采用了mapred的老接口。

package com.coder4.hive;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class DocFileInputFormat extends TextInputFormat implements
    JobConfigurable {

  @Override
  public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
      JobConf job, Reporter reporter) throws IOException {
    reporter.setStatus(split.toString());
    return new DocRecordReader(job, (FileSplit) split);
  }
}

在本文实现中,我们省略了压缩、解压缩等细节,如果需要,可以参考Hadoop官方的实现。

在上述的InputFormat中,只是简单的实现了接口。对文档进行切分的业务逻辑,在DocRecordReader中完成。

package com.coder4.hive;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class DocRecordReader implements RecordReader<LongWritable, Text> {

  // Reader
  private LineRecordReader reader;
  // The current line_num and lin
  private LongWritable lineKey = null;
  private Text lineValue = null;
  // Doc related
  private StringBuilder sb = new StringBuilder();
  private boolean inDoc = false;
  private final String DOC_START = "<DOC>";
  private final String DOC_END = "</DOC>";

  public DocRecordReader(JobConf job, FileSplit split) throws IOException {
    reader = new LineRecordReader(job, split);
    lineKey = reader.createKey();
    lineValue = reader.createValue();
  }

  @Override
  public void close() throws IOException {
    reader.close();
  }

  @Override
  public boolean next(LongWritable key, Text value) throws IOException {
    while (true) {
      // get current line
      if (!reader.next(lineKey, lineValue)) {
        break;
      }
      if (!inDoc) {
        // not in doc, check if <doc>
        if (lineValue.toString().startsWith(DOC_START)) {
          // reset doc status
          inDoc = true;
          // clean buff
          sb.delete(0, sb.length());
        }
      } else {
        // indoc, check if </doc>
        if (lineValue.toString().startsWith(DOC_END)) {
          // reset doc status
          inDoc = false;
          // set kv and return
          key.set(key.get() + 1);
          value.set(sb.toString());
          return true;
        } else {
          if (sb.length() != 0) {
            sb.append("\n");
          }
          sb.append(lineValue.toString());
        }
      }
    }
    return false;
  }

  @Override
  public float getProgress() throws IOException {
    return reader.getProgress();
  }

  @Override
  public LongWritable createKey() {
    return new LongWritable(0);
  }

  @Override
  public Text createValue() {
    return new Text("");
  }

  @Override
  public long getPos() throws IOException {
    return reader.getPos();
  }

}

如上的代码中,使用了LineRecordReader,用于读取Split的每一行。为了节省内存,这里对lineValue、lineKey进行了复用。

自定义OutputFormat

OutputFormat负责写入,这里要注意的是,不能再照抄Hadoop的对应接口了,需要实现HiveOutputFormat。

package com.coder4.hive;

import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;

@SuppressWarnings({ "rawtypes" })
public class DocFileOutputFormat<K extends WritableComparable, V extends Writable>
    extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {

  public RecordWriter getHiveRecordWriter(JobConf job, Path outPath,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProperties, Progressable progress)
      throws IOException {
    FileSystem fs = outPath.getFileSystem(job);
    FSDataOutputStream out = fs.create(outPath);

    return new DocRecordWriter(out);
  }
}

类似的,业务逻辑在如下的RecordWriter中:

package com.coder4.hive;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;

public class DocRecordWriter implements RecordWriter {

  private FSDataOutputStream out;
  private final String DOC_START = "<DOC>";
  private final String DOC_END = "</DOC>";

  public DocRecordWriter(FSDataOutputStream o) {
    this.out = o;
  }

  @Override
  public void close(boolean abort) throws IOException {
    out.flush();
    out.close();
  }

  @Override
  public void write(Writable wr) throws IOException {
    write(DOC_START);
    write("\n");
    write(wr.toString());
    write("\n");
    write(DOC_END);
    write("\n");
  }

  private void write(String str) throws IOException {
    out.write(str.getBytes(), 0, str.length());
  }

}

自定义SerDe or UDF?

在自定义InputFormat、OutputFomat后,我们已经将Split拆分为了 多个Row(文档)。

接下来,我们需要将Row拆分为Field。此时,我们有两个技术选择:

(1) 写一个UDF,将Row拆分为kv对,以Map<K, V>返回。此时,Table中只需定义一个STRING类型变量即可。
(2) 实现SerDe,将Row直接转化为Table对应的字段。

先来看一下UDF的这种方法,在Json解析等字段名不确定(或要经常变更) 的 应用场景下,这种方法还是比较适用的。

package com.coder4.hive;

import java.util.Map;
import org.apache.hadoop.hive.ql.exec.UDF;

public class DocToMap extends UDF {
  public Map<String, String> evaluate(String s) {
    return Doc.deserialize(s);
  }
}

其中Doc的deserilize只是自定义方法,无需重载方法或继承接口。

使用时的方法为:

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  doc STRING
)
STORED AS
  INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
  OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'
;

add jar /xxxxxxxx/hive-test.jar;

CREATE TEMPORARY FUNCTION doc_to_map AS 'com.coder4.hive.DocToMap';

SELECT
    raw['id'],
    raw['name']
FROM
(
    SELECT 
        doc_to_map(doc) raw
    FROM
        test_table
) t;

自定义SerDe

如果选择自定义SerDe,实现起来要略微麻烦一点。

这里主要参考了一篇Blog,和官方的源代码

http://svn.apache.org/repos/asf/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java

http://blog.cloudera.com/blog/2012/12/how-to-use-a-serde-in-apache-hive/

package com.coder4.hive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class MySerDe extends AbstractSerDe {

  // params
  private List<String> columnNames = null;
  private List<TypeInfo> columnTypes = null;
  private ObjectInspector objectInspector = null;
  // seperator
  private String nullString = null;
  private String lineSep = null;
  private String kvSep = null;

  @Override
  public void initialize(Configuration conf, Properties tbl)
      throws SerDeException {
    // Read sep
    lineSep = "\n";
    kvSep = "=";
    nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, "");

    // Read Column Names
    String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS);
    if (columnNameProp != null && columnNameProp.length() > 0) {
      columnNames = Arrays.asList(columnNameProp.split(","));
    } else {
      columnNames = new ArrayList<String>();
    }

    // Read Column Types
    String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
    // default all string
    if (columnTypeProp == null) {
      String[] types = new String[columnNames.size()];
      Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);
      columnTypeProp = StringUtils.join(types, ":");
    }
    columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);

    // Check column and types equals
    if (columnTypes.size() != columnNames.size()) {
      throw new SerDeException("len(columnNames) != len(columntTypes)");
    }

    // Create ObjectInspectors from the type information for each column
    List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>();
    ObjectInspector oi;
    for (int c = 0; c < columnNames.size(); c++) {
      oi = TypeInfoUtils
          .getStandardJavaObjectInspectorFromTypeInfo(columnTypes
              .get(c));
      columnOIs.add(oi);
    }
    objectInspector = ObjectInspectorFactory
        .getStandardStructObjectInspector(columnNames, columnOIs);

  }

  @Override
  public Object deserialize(Writable wr) throws SerDeException {
    // Split to kv pair
    if (wr == null)
      return null;
    Map<String, String> kvMap = new HashMap<String, String>();
    Text text = (Text) wr;
    for (String kv : text.toString().split(lineSep)) {
      String[] pair = kv.split(kvSep);
      if (pair.length == 2) {
        kvMap.put(pair[0], pair[1]);
      }
    }

    // Set according to col_names and col_types
    ArrayList<Object> row = new ArrayList<Object>();
    String colName = null;
    TypeInfo type_info = null;
    Object obj = null;
    for (int i = 0; i < columnNames.size(); i++) {
      colName = columnNames.get(i);
      type_info = columnTypes.get(i);
      obj = null;
      if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) {
        PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info;
        switch (p_type_info.getPrimitiveCategory()) {
        case STRING:
          obj = StringUtils.defaultString(kvMap.get(colName), "");
          break;
        case LONG:
        case INT:
          try {
            obj = Long.parseLong(kvMap.get(colName));
          } catch (Exception e) {
          }
        }
      }
      row.add(obj);
    }

    return row;
  }

  @Override
  public ObjectInspector getObjectInspector() throws SerDeException {
    return objectInspector;
  }

  @Override
  public SerDeStats getSerDeStats() {
    // Not suppourt yet
    return null;
  }

  @Override
  public Class<? extends Writable> getSerializedClass() {
    // Not suppourt yet
    return Text.class;
  }

  @Override
  public Writable serialize(Object arg0, ObjectInspector arg1)
      throws SerDeException {
    // Not suppourt yet
    return null;
  }

}

最终的Hive定义为:

add jar /xxxxxxxx/hive-test.jar;

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  id BIGINT,
  name STRING
)
ROW FORMAT SERDE 'com.coder4.hive.MySerDe'
STORED AS
  INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
  OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'

我们自定义的SerDe,会将每一个内的文档,根据k=v切分,若key name为id,name,则将其置入对应的字段中。

测试,效果:

首先,我们在hdfs目录/user/heyuan.lhy/doc/ 放置了一个文件,内容如下:

<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>

在如4中,定义了表的schema后,我们来SELECT。

SELECT * FROM test_table;
OK
1       a
2       b
3       c
4       d

可以看到,id和name字段被分别解析出来了。

由于我们的SerDe没有实现serialize方法,因此无法实现写入。

如果有需要,可以使用UDF + Map的方法,完成。

0 0 投票数
文章评分

本文转载自code4,原文链接:http://www.coder4.com/archives/4031。

(0)
上一篇 2022-05-10 18:23
下一篇 2022-05-12 10:57

相关推荐

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x
()
x