Hive GenericUDF 的一个简单样例


=Start=

缘由:

之前用基础的 UDF 接口写过UDF程序,但没有写过复杂点的 GenericUDF ,有这个想法但一直没有实际去写,一来没有直接的动力——(简单的用SQL实现,复杂点的用UDF实现,为什么还要用GenericUDF来写?),二来没有简单清晰的文档和代码(不像UDF那样,只需要继承UDF,然后实现evaluate()方法就行了,之前搜到的中文资料不多,也可能是没有用对关键字)。

但前段时间在跑数据的时候发现用了UDF和没用UDF的SQL运行时间相差数倍,让我在想是不是基础的 UDF 接口性能不好导致(因为在最开始搜资料的时候大概看到过这样的描述)?因此最近又花了一些时间简单学习了一下 GenericUDF 的写法,在此做个记录,方便以后参考。

正文:

参考解答:

如果你想实际测试/验证你写的UDF对性能损耗是不是很大(或者准确点说是区分因为UDF里增加的逻辑导致的性能损耗还是单单因为使用了UDF增加的性能损耗),可以先用一个基本没什么特殊功能的UDF做一个基准,然后再对比加上其它逻辑的UDF之后的查询耗时有没有明显增加。

UDF 和 GenericUDF 的简单对比
UDF
开发起来比较简单,只需要继承UDF,然后实现evaluate()方法就行。 Easier to develop
因为使用了反射,所以性能比较低。 Lower performance due to use of reflection
不支持一些非基本类型的参数。 Doesn't accept some non-primitive parameters like struct

GenericUDF
开发起来会复杂一点,相比UDF多了2个要实现的方法:initialize 和 getDisplayString,它们设置了一个 ObjectInspector 并当错误发生时会展示一些信息。 A little more difficult to develop
性能比UDF更好,因为使用延迟和短路执行。 Better performance because use of lazy evaluation and short-circuiting
支持所有非原语参数作为输入参数和返回类型。 Supports all non-primitive parameters as input parameters and return types
GenericUDF 要实现的方法介绍
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
复杂的 GenericUDF 可以处理 Map、List、Set 类型。

# GenericUDF
GenericUDF 实现比较复杂,需要先继承 GenericUDF。这个 API 需要操作 Object Inspectors,并且要对接收的参数类型和数量进行检查。GenericUDF 需要实现以下三个方法:

abstract ObjectInspector initialize(ObjectInspector[] arguments);
# Initialize this GenericUDF. This will be called once and only once per GenericUDF instance.
# 这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个 ObjectInspectors 数组。该方法检查接受正确的参数类型和参数个数。

abstract Object evaluate(GenericUDF.DeferredObject[] arguments);
# Evaluate the GenericUDF with the arguments.
# 这个方法类似 UDF 的 evaluate() 方法。它处理真实的参数,并返回最终结果。

abstract String getDisplayString(String[] children);
# Get the String to be displayed in explain.
# 这个方法用于当实现的 GenericUDF 出错的时候,打印出提示信息。而提示信息就是你实现该方法最后返回的字符串。
用 GenericUDF 实现的提取SQL字符串中的limit限制数值
package com.ixyzero.hive.udf.utils;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * @author ixyzero
 * Created on 2023-04-15
 */

@Description(name = "get_limit_size",
        value = "_FUNC_(str) - Get limit size from *search* sql string.",
        extended = "The value is returned as a string, or NULL if the argument was NULL.\n"
                + "Example:\n"
                + "  > SELECT _FUNC_('select * from tbl where col1 = 11 LIMIT 1000;');\n"
                + "  '1000'\n")

public class getSqlLimitSizeGUDF extends GenericUDF {

    private static final Pattern commentaryPattern = Pattern.compile("/\\\\*.*?\\\\*/|--.*?", Pattern.DOTALL);
    private static final Pattern limitPattern = Pattern.compile("\\s+limit\\s+(\\d+)\\s*(,|offset)?\\s*(\\d*)\\s*", Pattern.CASE_INSENSITIVE);

    private ObjectInspector[] argOIs;

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 1. 检查参数个数,有问题抛出异常即可
        if (objectInspectors.length != 1) {
            throw new UDFArgumentLengthException("get_limit_size() only take 1 parameter");
        }

        // 2. 检查参数类型,有问题抛出异常即可
        ObjectInspector objectInspector = objectInspectors[0];
        if ( !(objectInspector instanceof StringObjectInspector) ) {
            throw new UDFArgumentException("the first argument must be a string");
        }

        argOIs = objectInspectors;
        // 3. 返回一个 ObjectInspector 类型的变量,但好像也没有其它地方接收并处理这个变量,所以直接返回null也行
        // return null;
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 下面3行的作用其实就是从通过initialize检查的参数中提取并转换成Java编程中常见的数据类型方便后面的开发
        Object tmp = deferredObjects[0].get();
        StringObjectInspector strObjectInspector = (StringObjectInspector) argOIs[0]; // ok,这一步容易出错,因为不清楚细节
        // StringObjectInspector strObjectInspector = (StringObjectInspector) deferredObjects[0]; // ClassCastException
        String sql = strObjectInspector.getPrimitiveJavaObject(tmp);

        if (sql == null || sql.trim().isEmpty()) {
            return null;
        }
        sql = commentaryPattern.matcher(sql).replaceAll("").toLowerCase();
        String ret = null;

        try {
            Matcher m = limitPattern.matcher(sql);
            if (m.find()) {
                if (m.group(2) == null) {
                    ret = m.group(1);
                } else if (m.group(2).equalsIgnoreCase("offset")) {
                    ret = m.group(1);
                } else {
                    ret = m.group(3);
                }
            } else {
                return null;
            }
        } catch (Exception e) {
            // e.printStackTrace();
            return null;
        }

        return ret;
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "get_limit_size(sql)";
    }

    public static void main(String[] args) {
        // 因为继承自 GenericUDF 和 UDF 不太一样,多了2个要 override 的函数,所以更改成常规类再测试需要变更的地方比较多
        // 这里就不测试具体的代码逻辑了,因为在其它的地方验证过,这里主要是简单跑通一下 GenericUDF 编写的流程,真的是太麻烦
        // 麻烦在于变量的类型是 ObjectInspector/DeferredObject 这种,在用常规Java代码处理之前需要先进行类型转换
        // 对于转换方法不熟悉的人来说真的是很容易踩坑和崩溃——本身逻辑不复杂,但在应付这些类型转换上面还真的是非常麻烦和复杂
    }
    // 除了直接可见的多了2个要继承的函数 initialize/getDisplayString 之外
    // 我个人通过上面这一段代码的编写和测试,发现最大的问题其实是在 ObjectInspector 这种新出现的变量类型的检查判断和内容提取
    // 最最直接的就是在上面 evaluate 函数开始的那3行里面,一开始完全想不到它的入参只是交给getPrimitiveJavaObject 提取类型的信息
    // 真正的内容需要用一个全局变量从 initialize 函数那里赋值,然后经过一些类型转换才能进行后面的常规编码操作
    // 不过这种坑踩过一两次之后好像也就不是大问题了,也许和我当前没有写过更复杂的UDF有关,后面如果需要的话找专门的数据开发同学来做会更合适
}
参考链接:

Hive UDF的编写
https://ixyzero.com/blog/archives/4782.html

提取SQL中的limit限制数值
https://ixyzero.com/blog/archives/5428.html

Hive UDF 简介 #nice
https://www.hadoopdoc.com/hive/hive-udf-intro

How to write a Hive UDF
http://mark.thegrovers.ca/tech-blog/how-to-write-a-hive-udf

Using Hive Advanced User Defined Functions with Generic and Complex Data Types
https://www.bmc.com/blogs/using-hive-advanced-user-defined-functions-with-generic-and-complex-data-types/

Class GenericUDF
https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.2/hive_javadocs/ql/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.html

GenericUDFStringToMap.java #可以参考用于后面学习复杂的GenericUDF时变量类型转换和处理的逻辑
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFStringToMap.java

GenericUDFInstr.java
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInstr.java

Hive之ObjectInspector详解
https://blog.csdn.net/weixin_42167895/article/details/108314139

DeveloperGuide
https://cwiki.apache.org/confluence/display/hive/developerguide#DeveloperGuide-UDFsandUDAFs-howtoaddnewUDFsandUDAFs

=END=


《 “Hive GenericUDF 的一个简单样例” 》 有 4 条评论

  1. Apache Hivemall
    https://incubator.apache.org/projects/hivemall.html
    `
    Hivemall is a library for machine learning implemented as Hive UDFs/UDAFs/UDTFs. Hivemall runs on Hadoop-based data processing frameworks, specifically on Apache Hive, Apache Spark, and Apache Pig, that support Hive UDFs as an extension mechanism.

    Hivemall是一个用于机器学习的库(提供了一系列机器学习相关的功能,如回归、分类、推荐、聚类、特征工程等等),实现成Hive UDFs/UDAFs/UDTFs的形式。Hivemall运行在基于Hadoop的数据处理框架上,特别是在支持Hive UDF作为扩展机制的Apache Hive、Apache Spark和Apache Pig上。

    — 显示当前Hive中有多少函数可用
    show functions;

    — 查看当前使用的hivemall版本
    SELECT hivemall.hivemall_version();

    — 显示函数的描述信息
    desc function hivemall.array_intersect;
    /*
    hivemall.array_intersect(array x1, array x2, ..) – Returns an intersect of given arrays
    */

    — 显示函数的扩展描述信息
    desc function extend hivemall.array_intersect;
    /*
    hivemall.array_intersect(array x1, array x2, ..) – Returns an intersect of given arrays
    Function class:hivemall.tools.array.ArrayIntersectUDF
    Function type:PERSISTENT
    Resource:viewfs:///path/to/hivemall-all-0.5.0-incubating.jar
    */
    `

  2. Hive UDF’s for the data warehouse
    https://github.com/klout/brickhouse
    https://github.com/klout/brickhouse/tree/master/src/main/java/brickhouse/udf
    `
    Brickhouse 是一个Hive的UDF集合,用于提高开发人员的生产力,以及Hive查询的可扩展性和健壮性。Brickhouse 涵盖了广泛的功能,分别放在以下几个包里面:
    * collect – 各种用于处理map和array的“collect”实现
    * json – 在Hive原生结构和json字符串之间进行转换

    * bloom – 围绕Hadoop BloomFilter实现的UDF包装。
    * sketch – 一个KMV sketch集的实现,用于大型数据集的估算。
    * sanity – 用于在生产环境中实现健康检查和管理Hive的工具。
    * hbase – 实验性的UDF,作为集成Hive和HBase的另一种方式。

    Brickhouse is a collection of UDF’s for Hive to improve developer productivity, and the scalability and robustness of Hive queries.
    Brickhouse covers a wide range of functionality, grouped in the following packages.
    * collect – An implementaion of “collect” and various utilities for dealing with maps and arrays.
    * json – Translate between Hive structures and JSON strings
    * sketch – An implementation of KMV sketch sets, for reach estimation of large datasets.
    * bloom – UDF wrappers around the Hadoop BloomFilter implementation.
    * sanity – Tools for implementing sanity checks and managing Hive in a production environment.
    * hbase – Experimental UDFs for an alternative way to integrate Hive with HBase.

    UDFs related to collect and other list/map manipulation

    collect – Similar to Ruby’s collect method; Aggregates values into a list or map.

    combine — Take multiple lists or maps, and return a list or map with all the elements

    combine_unique — Similar to combine, but only return unique elements.

    collect_max – Similar to collect, but save only the keys containing the max 20 values.

    join_array — Opposite of split UDF, take an array of strings and concatenate with a separator

    union_map — UDAF to merge multiple maps.

    union_max — UDAF to merge multiple maps, but retain only the keys having the max 20 values.

    map_key_values — From a map, generate an array of structs containing “key”, and “value” fields, for easier explosion and manipulation

    index_of — Return the location in a list of a certain value.

    array_index — workaround for accessing arrays for non-const indexes ( can’t use [] !!!)

    map_index — array_index for maps

    cast_array — cast an array of one type to an array of another type; ie from array to array

    cast_map — cast_array for maps

    intersect_array — return elements which are all contained in multiple arrays

    map_filter_keys — return the values of a map for a given set of keys.

    set_diff — the difference of two sets

    truncate_array — Truncate an array to a fixed size

    conditional_emit — UDTF to emit rows based upon an array of boolean expressions

    numeric_range — UDTF to emit rows for a range of integers

    multiday_count — produce sums for date-related records across multiple date ranges (ie. produce 1-day. 7-day and 30-day counts in one pass.)
    `

  3. hive array_except
    https://juejin.cn/s/hive%20array_except
    `
    # Hive 官方并没有提供 array_except 这个函数,需要自己写UDF来实现。但是在 GitHub 仓库里有一个 GenericUDFArrayExcept 的UDF样例可以参考一下。

    Hive ARRAY_EXCEPT 是 Hive 中的一个内置函数,它可以在两个数组之间执行差集操作。它返回一个新的数组,其中包含了第一个数组中不存在于第二个数组中的元素。

    语法如下:
    ARRAY array_except(ARRAY a1, ARRAY a2)

    其中,a1 和 a2 是需要进行差集操作的两个数组,而 T 是数组元素的数据类型。

    示例:
    select array_except(array(1, 2, 3), array(2, 3, 4)) as result;

    结果:
    [1]
    `

    hive udf函数 array_except 实现
    https://blog.csdn.net/qq_35515661/article/details/130161544

    数组函数和运算符
    https://www.alibabacloud.com/help/zh/sls/user-guide/array-functions-and-operators

    复杂类型函数
    https://help.aliyun.com/zh/maxcompute/user-guide/complex-type-functions#section-e0m-o6l-r0k

    spark sql 函数 array_except(arr1,arr2)能否确保arr1中原有元素的顺序
    https://blog.csdn.net/qq_35515661/article/details/130141316

    GenericUDFArrayExcept.java
    https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArrayExcept.java#L38

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注