概览
Hive中,默认使用的是TextInputFormat,一行表示一条记录。在每条记录(一行中),默认使用^A分割各个字段。
在有些时候,我们往往面对多行,结构化的文档,并需要将其导入Hive处理,此时,就需要自定义InputFormat、OutputFormat,以及SerDe了。
首先来理清这三者之间的关系,我们直接引用Hive官方说法:
SerDe is a short name for "Serializer and Deserializer."
Hive uses SerDe (and !FileFormat) to read and write table rows.
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row object
Row object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files
总结一下,当面临一个HDFS上的文件时,Hive将如下处理(以读为例):
(1) 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
(2) 调用SerDe的Deserializer,将一行(Row),切分为各个字段。
当HIVE执行INSERT操作,将Row写入文件时,主要调用OutputFormat、SerDe的Seriliazer,顺序与读取相反。
本文将对InputFormat、OutputFormat、SerDe自定义,使Hive能够与自定义的文档格式进行交互:
<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>
如上所示,每篇文档用
自定义InputFormat
Hive的InputFormat来源于Hadoop中的对应的部分。需要注意的是,其采用了mapred的老接口。
package com.coder4.hive;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class DocFileInputFormat extends TextInputFormat implements
JobConfigurable {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new DocRecordReader(job, (FileSplit) split);
}
}
在本文实现中,我们省略了压缩、解压缩等细节,如果需要,可以参考Hadoop官方的实现。
在上述的InputFormat中,只是简单的实现了接口。对文档进行切分的业务逻辑,在DocRecordReader中完成。
package com.coder4.hive;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class DocRecordReader implements RecordReader<LongWritable, Text> {
// Reader
private LineRecordReader reader;
// The current line_num and lin
private LongWritable lineKey = null;
private Text lineValue = null;
// Doc related
private StringBuilder sb = new StringBuilder();
private boolean inDoc = false;
private final String DOC_START = "<DOC>";
private final String DOC_END = "</DOC>";
public DocRecordReader(JobConf job, FileSplit split) throws IOException {
reader = new LineRecordReader(job, split);
lineKey = reader.createKey();
lineValue = reader.createValue();
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
while (true) {
// get current line
if (!reader.next(lineKey, lineValue)) {
break;
}
if (!inDoc) {
// not in doc, check if <doc>
if (lineValue.toString().startsWith(DOC_START)) {
// reset doc status
inDoc = true;
// clean buff
sb.delete(0, sb.length());
}
} else {
// indoc, check if </doc>
if (lineValue.toString().startsWith(DOC_END)) {
// reset doc status
inDoc = false;
// set kv and return
key.set(key.get() + 1);
value.set(sb.toString());
return true;
} else {
if (sb.length() != 0) {
sb.append("\n");
}
sb.append(lineValue.toString());
}
}
}
return false;
}
@Override
public float getProgress() throws IOException {
return reader.getProgress();
}
@Override
public LongWritable createKey() {
return new LongWritable(0);
}
@Override
public Text createValue() {
return new Text("");
}
@Override
public long getPos() throws IOException {
return reader.getPos();
}
}
如上的代码中,使用了LineRecordReader,用于读取Split的每一行。为了节省内存,这里对lineValue、lineKey进行了复用。
自定义OutputFormat
OutputFormat负责写入,这里要注意的是,不能再照抄Hadoop的对应接口了,需要实现HiveOutputFormat。
package com.coder4.hive;
import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
@SuppressWarnings({ "rawtypes" })
public class DocFileOutputFormat<K extends WritableComparable, V extends Writable>
extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
public RecordWriter getHiveRecordWriter(JobConf job, Path outPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress)
throws IOException {
FileSystem fs = outPath.getFileSystem(job);
FSDataOutputStream out = fs.create(outPath);
return new DocRecordWriter(out);
}
}
类似的,业务逻辑在如下的RecordWriter中:
package com.coder4.hive;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
public class DocRecordWriter implements RecordWriter {
private FSDataOutputStream out;
private final String DOC_START = "<DOC>";
private final String DOC_END = "</DOC>";
public DocRecordWriter(FSDataOutputStream o) {
this.out = o;
}
@Override
public void close(boolean abort) throws IOException {
out.flush();
out.close();
}
@Override
public void write(Writable wr) throws IOException {
write(DOC_START);
write("\n");
write(wr.toString());
write("\n");
write(DOC_END);
write("\n");
}
private void write(String str) throws IOException {
out.write(str.getBytes(), 0, str.length());
}
}
自定义SerDe or UDF?
在自定义InputFormat、OutputFomat后,我们已经将Split拆分为了 多个Row(文档)。
接下来,我们需要将Row拆分为Field。此时,我们有两个技术选择:
(1) 写一个UDF,将Row拆分为kv对,以Map<K, V>返回。此时,Table中只需定义一个STRING类型变量即可。
(2) 实现SerDe,将Row直接转化为Table对应的字段。
先来看一下UDF的这种方法,在Json解析等字段名不确定(或要经常变更) 的 应用场景下,这种方法还是比较适用的。
package com.coder4.hive;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.UDF;
public class DocToMap extends UDF {
public Map<String, String> evaluate(String s) {
return Doc.deserialize(s);
}
}
其中Doc的deserilize只是自定义方法,无需重载方法或继承接口。
使用时的方法为:
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
doc STRING
)
STORED AS
INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'
;
add jar /xxxxxxxx/hive-test.jar;
CREATE TEMPORARY FUNCTION doc_to_map AS 'com.coder4.hive.DocToMap';
SELECT
raw['id'],
raw['name']
FROM
(
SELECT
doc_to_map(doc) raw
FROM
test_table
) t;
自定义SerDe
如果选择自定义SerDe,实现起来要略微麻烦一点。
这里主要参考了一篇Blog,和官方的源代码
http://blog.cloudera.com/blog/2012/12/how-to-use-a-serde-in-apache-hive/
package com.coder4.hive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class MySerDe extends AbstractSerDe {
// params
private List<String> columnNames = null;
private List<TypeInfo> columnTypes = null;
private ObjectInspector objectInspector = null;
// seperator
private String nullString = null;
private String lineSep = null;
private String kvSep = null;
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
// Read sep
lineSep = "\n";
kvSep = "=";
nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, "");
// Read Column Names
String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS);
if (columnNameProp != null && columnNameProp.length() > 0) {
columnNames = Arrays.asList(columnNameProp.split(","));
} else {
columnNames = new ArrayList<String>();
}
// Read Column Types
String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
// default all string
if (columnTypeProp == null) {
String[] types = new String[columnNames.size()];
Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);
columnTypeProp = StringUtils.join(types, ":");
}
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);
// Check column and types equals
if (columnTypes.size() != columnNames.size()) {
throw new SerDeException("len(columnNames) != len(columntTypes)");
}
// Create ObjectInspectors from the type information for each column
List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>();
ObjectInspector oi;
for (int c = 0; c < columnNames.size(); c++) {
oi = TypeInfoUtils
.getStandardJavaObjectInspectorFromTypeInfo(columnTypes
.get(c));
columnOIs.add(oi);
}
objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnOIs);
}
@Override
public Object deserialize(Writable wr) throws SerDeException {
// Split to kv pair
if (wr == null)
return null;
Map<String, String> kvMap = new HashMap<String, String>();
Text text = (Text) wr;
for (String kv : text.toString().split(lineSep)) {
String[] pair = kv.split(kvSep);
if (pair.length == 2) {
kvMap.put(pair[0], pair[1]);
}
}
// Set according to col_names and col_types
ArrayList<Object> row = new ArrayList<Object>();
String colName = null;
TypeInfo type_info = null;
Object obj = null;
for (int i = 0; i < columnNames.size(); i++) {
colName = columnNames.get(i);
type_info = columnTypes.get(i);
obj = null;
if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info;
switch (p_type_info.getPrimitiveCategory()) {
case STRING:
obj = StringUtils.defaultString(kvMap.get(colName), "");
break;
case LONG:
case INT:
try {
obj = Long.parseLong(kvMap.get(colName));
} catch (Exception e) {
}
}
}
row.add(obj);
}
return row;
}
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return objectInspector;
}
@Override
public SerDeStats getSerDeStats() {
// Not suppourt yet
return null;
}
@Override
public Class<? extends Writable> getSerializedClass() {
// Not suppourt yet
return Text.class;
}
@Override
public Writable serialize(Object arg0, ObjectInspector arg1)
throws SerDeException {
// Not suppourt yet
return null;
}
}
最终的Hive定义为:
add jar /xxxxxxxx/hive-test.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
id BIGINT,
name STRING
)
ROW FORMAT SERDE 'com.coder4.hive.MySerDe'
STORED AS
INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'
我们自定义的SerDe,会将每一个
测试,效果:
首先,我们在hdfs目录/user/heyuan.lhy/doc/ 放置了一个文件,内容如下:
<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>
在如4中,定义了表的schema后,我们来SELECT。
SELECT * FROM test_table;
OK
1 a
2 b
3 c
4 d
可以看到,id和name字段被分别解析出来了。
由于我们的SerDe没有实现serialize方法,因此无法实现写入。
如果有需要,可以使用UDF + Map的方法,完成。
本文转载自code4,原文链接:http://www.coder4.com/archives/4031。