当前位置: 首页 > news >正文

Hive自定义函数——简单使用

        在 Hadoop 生态系统中,特别是在 Hive 和其他 SQL-on-Hadoop 工具中,UDF(用户自定义函数),UDAF(用户自定义聚合函数),以及 UDTF(用户自定义表生成函数)允许用户定义自定义的函数逻辑,以适应特定的业务需求。这些自定义函数帮助扩展 Hive 的功能,在数据处理和分析中变得非常灵活。下面分别介绍这三种函数及其优劣势、适用场景,并给出具体的示例。

1. UDF(User Defined Function) - 用户自定义函数

定义

UDF 是一种用户自定义的单行函数,它将输入的一行数据进行处理并返回一个结果。例如,可以将 UDF 用来执行列的转换或简单的计算。

优势
  • 简单、灵活:UDF 通常处理单行输入并返回单个结果,非常适合执行简单的列级别的操作。
  • 易于实现:编写和注册 UDF 相对容易,只需实现一个特定的方法。
  • 性能较好:因为它只对单行数据操作,所以性能通常较好。
劣势
  • 只能处理一行数据,无法聚合多行数据。
  • UDF 的逻辑相对简单,不能实现复杂的表操作或数据拆分。
适用场景
  • 适用于单列或多列的简单数据转换或处理。
  • 常用于数据清洗、格式化、字符串处理等场景。
示例

假设需要一个 UDF 函数来将字符串转为大写:

public class UpperCaseUDF extends UDF {public String evaluate(String input) {return input == null ? null : input.toUpperCase();}
}

使用场景:

SELECT UPPERCASE(name) FROM employees;

将 employees 表中的 name 列转为大写。

2. UDAF(User Defined Aggregation Function) - 用户自定义聚合函数

定义

UDAF 是一种用户自定义的聚合函数,它处理多行数据,并返回一个聚合后的结果。类似于 SQL 中的 SUMAVG 等聚合函数。

优势
  • 能够聚合多行数据,适合处理需要计算汇总值、平均值、最大值、最小值等操作的场景。
  • 提供了灵活的多行数据处理能力,可以自定义复杂的聚合逻辑。
劣势
  • 实现较为复杂:相比 UDF,编写 UDAF 需要更多的步骤和逻辑处理,如分阶段的聚合和合并操作。
  • 聚合操作需要在不同阶段维护状态,因此可能会消耗更多内存和计算资源。
适用场景
  • 适合需要聚合多行数据的场景,如汇总计算、求平均、最大最小值等。
  • 适用于自定义复杂的统计分析,如百分位数、标准差等。
示例

假设需要计算员工工资的方差,可以编写一个自定义 UDAF。

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfoBase;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfoImpl;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.PrimitiveDoubleObjectInspector;
import org.apache.hadoop.io.DoubleWritable;public class VarianceUDAF extends AbstractGenericUDAFResolver {@Overridepublic GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws HiveException {return new VarianceEvaluator();}public static class VarianceEvaluator extends GenericUDAFEvaluator {// 聚合状态类,用来保存聚合过程中间结果public static class VarianceBuffer implements UDAFEvaluator {private long count;      // 数据点个数private double sum;      // 数据和private double sumOfSquares; // 数据平方和public VarianceBuffer() {init();}public void init() {count = 0;sum = 0.0;sumOfSquares = 0.0;}}private PrimitiveDoubleObjectInspector inputOI;private ObjectInspector outputOI;@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);// 定义输入和输出的 ObjectInspectorif (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {inputOI = (PrimitiveDoubleObjectInspector) parameters[0];}outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;return outputOI;}@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {VarianceBuffer buffer = new VarianceBuffer();buffer.init();return buffer;}@Overridepublic void reset(AggregationBuffer agg) throws HiveException {((VarianceBuffer) agg).init();}@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {if (parameters[0] != null) {VarianceBuffer buffer = (VarianceBuffer) agg;double value = inputOI.get(parameters[0]);buffer.count++;buffer.sum += value;buffer.sumOfSquares += value * value;}}@Overridepublic Object terminatePartial(AggregationBuffer agg) throws HiveException {// 返回部分聚合结果VarianceBuffer buffer = (VarianceBuffer) agg;Object[] result = new Object[3];result[0] = new DoubleWritable(buffer.count);result[1] = new DoubleWritable(buffer.sum);result[2] = new DoubleWritable(buffer.sumOfSquares);return result;}@Overridepublic void merge(AggregationBuffer agg, Object partial) throws HiveException {if (partial != null) {VarianceBuffer buffer = (VarianceBuffer) agg;Object[] partialResult = (Object[]) partial;buffer.count += ((DoubleWritable) partialResult[0]).get();buffer.sum += ((DoubleWritable) partialResult[1]).get();buffer.sumOfSquares += ((DoubleWritable) partialResult[2]).get();}}@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {VarianceBuffer buffer = (VarianceBuffer) agg;if (buffer.count == 0) {return null;}double mean = buffer.sum / buffer.count;double variance = (buffer.sumOfSquares - buffer.sum * mean) / buffer.count;return new DoubleWritable(variance);}}
}

使用场景:

SELECT VARIANCE(salary) FROM employees;

计算 employees 表中 salary 列的工资方差。

3. UDTF(User Defined Table-Generating Function) - 用户自定义表生成函数

定义

UDTF 是一种用户自定义的表生成函数,它接受一行输入,但可以返回多行甚至多列数据。它的作用类似于 SQL 中的 EXPLODE,将一行数据拆分成多行。

优势
  • 能够从单行数据生成多行或多列数据,适合进行数据拆分和结构化。
  • 非常灵活,能够处理复杂的多行、多列数据生成场景。
  • 适合需要扩展一行数据到多行数据的场景。
劣势
  • 实现复杂度较高:与 UDF 和 UDAF 相比,UDTF 需要处理更多的输出逻辑。
  • 性能较差:由于 UDTF 会输出多行数据,可能会引入较大的开销,特别是当输出数据量大时。
适用场景
  • 适合将一行数据拆分成多行数据的场景,如列表拆分、JSON 解析等。
  • 适合处理复杂的表生成操作,如跨多列的数据展开或分组。
示例

假设有一列存储了逗号分隔的字符串,需要将其拆分成多行,可以编写一个 UDTF。

public class ExplodeUDTF extends GenericUDTF {@Overridepublic void process(Object[] args) throws HiveException {String input = args[0].toString();for (String word : input.split(",")) {forward(new Object[]{word});}}
}

使用场景:

SELECT EXPLODE(split_col) FROM table_with_comma_separated_data;

将 table_with_comma_separated_data 表中 split_col 列中的逗号分隔字符串拆分成多行。


UDF、UDAF、UDTF 的比较

特性UDFUDAFUDTF
处理的输入一行数据多行数据一行数据
输出单个结果单个聚合结果多行或多列数据
优点实现简单,适合单行数据处理适合复杂的聚合操作,如求和、平均值等适合数据拆分、扩展多行数据
缺点不能处理多行或表级别的操作实现复杂,需要维护状态实现复杂,性能可能较差
适用场景单列转换,如格式化、数据清洗多行聚合操作,如汇总、统计一行拆分多行,如 JSON 解析,列表拆分
使用示例SELECT UPPER(col)SELECT SUM(col)SELECT EXPLODE(col)

总结

  • UDF:适用于列级的简单数据转换和计算,如格式化、字符串处理等。
  • UDAF:适合需要对多行数据进行聚合的场景,如求和、求平均等。
  • UDTF:适合需要将一行数据拆分成多行的情况,如数组或字符串拆分。

每种函数类型都有其独特的优缺点,选择哪一种取决于具体的数据处理需求和应用场景。


http://www.mrgr.cn/news/30241.html

相关文章:

  • 为大模型提供服务需要多少 GPU 显存?
  • Linux中使用Docker容器构建Tomcat容器完整教程
  • 【深度学习】(3)--损失函数
  • Go 并发模式:扩展与聚合的高效并行
  • 二、各种热型
  • openmv与stm32通信
  • 前端大屏自适应方案
  • 基于GIKT深度知识追踪模型的习题推荐系统源代码+数据库+使用说明,后端采用flask,前端采用vue
  • 阿里云kafka消息写入topic失败
  • jenkins 部署到tomcat
  • 2024最新的软件测试面试八股文(答案+文档)
  • Xmind软件自定义安装,如何安装在指定位置(不修改注册表),修改默认安装到c盘软件的安装位置
  • rpm 与 yum
  • JAVA客户端发送图片给服务端案例
  • JavaWeb笔记整理——Redis
  • 信息收集常用指令
  • 『功能项目』QFrameWorkBug修改器界面【65】
  • TimeSpan(一个简单的计时器)
  • 佰朔资本:pb和pe是什么?股票pb和pe怎么看?
  • apt-get install 安装的tomcat配置