Kylin实战(一):用Hive UDTF平铺内嵌类型字段

众所周知,Apache Kylin 是基于星型模型设计的,并不支持雪花模型(更新:在最近发布的2.0版本中增加了对雪花模型的支持)。然而在大多数情况下,Kylin的存储引擎是Hive。不同于普通RDMS,Hive支持复杂数据类型,包括Array,Map和Struct。复杂数据类型的字段又可以称为内嵌对象(nested object), 某种程度上属于雪花模型。如果导入带有复杂数据类型字段的Hive表,Kylin会在校验表元信息的时候报错。

Kylin Error

这意味着我们需要先对该表做平铺(flatten)处理,再将结果导入Kylin。此时就会用到Hive的UDTF。UDTF(User Defined Table Generating Functions)用于将一行输入转化为多行或者多列,将一个map字段分拆为多行或者多列自然不在话下。

根据官方文档,Hive内置了explode()函数可以拆分Map字段。不过explode()是按列拆,也就是拆成key和value两行,而我们希望的是将Map每个Key对应的属性拆为一列,这样我们就可以在Kylin中直接根据列名来建模。因此我们需要自定义UDTF来完成平铺,另外由于这个函数涉及列名,所以是和Map字段紧耦合的,对于不同的Map要字段要定义不同的UDTF。

假设Hive表的一个Map类型的字段,Key可能为task、cash、gold、online。要将该字段平铺,可以设计如下的UDTF:

package me.whitewood.bigdata.hive.udtf;

/**
 * Hive UTDF Demo
 * Created by Whitewood on 2017/8/12.
 */
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.lazy.LazyMap;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlattenMap extends GenericUDTF{
    @Override
    public void close() throws HiveException {}

    /**
     * 初始化函数,用于检验参数类型,并返回UDTF的输出字段名和数据类型
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector structObjectInspector)
            throws UDFArgumentException {
        List<String> fieldNames = new ArrayList<>(4);
        List<ObjectInspector> fieldOIs = new ArrayList<>(4);
        fieldNames.add("task");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("cash");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
        fieldNames.add("online");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
        fieldNames.add("gold");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        Object input = args[0];
        if(!(input instanceof LazyMap)){
            throw new HiveException("flattenMap function should only be applied to map columns");
        }
        //这里要注意,Hive的Map字段是LazyMap,其中的Key、Value也是LazyString。
        //若不手动将其转为String,直接用String类型的Key去Map取值会总是为null。
        Map<Object,Object> map = ((LazyMap) input).getMap();
        Map<String,String> stringMap = new HashMap<>(4);
        for(Map.Entry entry: map.entrySet()){//lazy string
            stringMap.put(entry.getKey().toString(), entry.getValue().toString());
        }
        forward(new Object[]{
            stringMap.getOrDefault("task", null),
            Integer.parseInt(stringMap.getOrDefault("cash", "0")),
            Integer.parseInt(stringMap.getOrDefault("gold", "0")),
            Integer.parseInt(stringMap.getOrDefault("online", "0")),
        });
    }
}

开发好UDTF之后将其打包为Kylin-UDTF-1.0-SNAPSHOT.jar,放到HDFS某个路径下:
hdfs dfs -put /home/whitewood/hive/udtf/Kylin-UDTF-1.0-SNAPSHOT.jar hdfs://masters/user/hive/udf

将它注册到Hive的permenant function中:
CREATE FUNCTION flattenMa AS 'me.whitewood.bigdata.hive.udtf.FlattenMap' using jar 'hdfs://masters/user/hive/udf/Kylin-UDTF-1.0-SNAPSHOT.jar';

之后就可以在SQL中使用自定义的UDTF:
SELECT flattenMap(taskMap) FROM tbl_task LIMIT 10;

参考文献:
1.【Kylin实战】Hive复杂数据类型与视图

本文是原创文章,转载请注明:时间与精神的小屋 - Kylin实战(一):用Hive UDTF平铺内嵌类型字段