Hive系列文章
- Hive表的基本操作
- Hive中的集合数据类型
- Hive动态分区详解
- hive中orc格式表的数据导入
- Java通过jdbc连接hive
- 通过HiveServer2访问Hive
- SpringBoot连接Hive实现自助取数
- hive关联hbase表
- Hive udf 使用方法
- Hive基于UDF进行文本分词
- Hive窗口函数row number的用法
- 数据仓库之拉链表
hive作为一个sql查询引擎,自带了一些基本的函数,比如count
(计数),sum
(求和),有时候这些基本函数满足不了我们的需求,这时候就要写hive hdf(user defined funation)
,又叫用户自定义函数,应用与select
语句中。
哪些情况满足不了我们的需求呢,比如:
- 需要将字段与数据库中查询一下,做个比对;
- 需要对数据进行复杂处理;
- 等等
hive udf 用法
下面是一个判断hive表字段是否包含\'100\'的简单udf
:
package com.js.dataclean.hive.udf.hm2
import org.apache.hadoop.hive.ql.exec.UDF;
public class IsContains100 extends UDF{
public String evaluate(String s){
if(s == null || s.length() == 0){
return "0";
}
return s.contains("100")?"1":"0";
}
}
使用maven将其打包,进入hive cli
,输入命令:
add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
创建完临时函数,即可使用这个函数了:
select isContains100('abc100def') from table limit 1;
1
hive udf 创建与使用步骤
- 继承
org.apache.hadoop.hive.ql.exec.UDF
类,实现evaluate方法; - 打包上传到集群,通过
create temporary function
创建临时函数,不加temporary
就创建了一个永久函数; - 通过select 语句使用;
下面是一个例子,通过读取mysql数据库中的规则,为hive中的workflow返回对应的,类型:
type workflow
a 1
a 2
b 11
b 22
b 33
我们希望,将hive的某一个字段取值为,1,2的变为a
,取值为11,22,33的全部变为b
,就是归类的意思。 这个udf可以这么实现:
package com.js.dataclean.hive.udf.hm2.workflow;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ Author: keguang
* @ Date: 2018/12/13 16:24
* @ version: v1.0.0
* @ description:
*/
public class GetWorkflow extends UDF{
private static final String host = "0.0.0.0";
private static final String port = "3306";
private static final String database = "root";
private static final String userName = "root";
private static final String password = "123456";
private static String url = "";
private static final String driver = "com.mysql.jdbc.Driver";
private static Connection conn = null;
private static Map<String, List<String>> workflowType = null;
static {
url = "jdbc:mysql://" + host + ":" + port + "/" + database;
try {
Class.forName(driver);
conn = DriverManager.getConnection(url, userName, password);
workflowType = getWorkflowType(conn);
} catch (Exception e) {
e.printStackTrace();
}
}
private static Map<String, List<String>> getWorkflowType(Connection conn){
Map<String, List<String>> workflowType = new HashMap<>();
String sql = "select * from flash_player_workflow";
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
while (rs.next()){
String workflow = rs.getString("workflow");
String type = rs.getString("flag");
List<String> workflows = workflowType.get(type);
if(workflows == null){
workflows = new ArrayList<>();
}
workflows.add(workflow);
workflowType.put(type, workflows);
}
} catch (SQLException e) {
e.printStackTrace();
}finally {
// 关闭链接
if(conn != null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return workflowType;
}
public String evaluate(String s){
assert workflowType != null;
for(String type:workflowType.keySet()){
List<String> workflows = workflowType.get(type);
if(workflows.contains(s)){
return type;
}
}
return s;
}
}
查看hive function的用法:
查month 相关的函数
show functions like '*month*';
查看 add_months 函数的用法
desc function add_months;
查看 add_months 函数的详细说明并举例
desc function extended add_months;
hive 中的 UDAF
可以看出,udf就是一个输入一个输出,输入一个性别,返回\'男\'或者\'女\',如果我们想实现select date,count(1) from table,统计每天的流量呢?这就是一个分组统计,显然是多个输入,一个输出,这时候udf已经不能满足我们的需要,就需要写udaf,user defined aggregare function
(用户自定义聚合函数)。
这里写一个字符串连接函数,相当于concat的功能,将多行输入,合并为一个字符串:
package com.js.dataclean.hive.udaf.hm2;
import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* 实现字符串连接聚合的UDAF
* @version v1.0.0
* @Author:keguang
* @Date:2018/10/22 14:36
*/
public class MutiStringConcat extends UDAF{
public static class SumState{
private String sumStr;
}
public static class SumEvaluator implements UDAFEvaluator{
SumState sumState;
public SumEvaluator(){
super();
sumState = new SumState();
init();
}
@Override
public void init() {
sumState.sumStr = "";
}
/**
* 来了一行数据
* @param s
* @return
*/
public boolean iterate(String s){
if(!StringUtil.isNull(s)){
sumState.sumStr += s;
}
return true;
}
/**
* 状态传递
* @return
*/
public SumState terminatePartial() {
return sumState;
}
/**
* 子任务合并
* @param state
* @return
*/
public boolean merge(SumState state){
if(state != null){
sumState.sumStr += state.sumStr;
}
return true;
}
/**
* 返回最终结果
* @return
*/
public String terminate(){
return sumState.sumStr;
}
}
}
用法,与udf一样,还是需要打包并且到hive cli中注册使用。
关于UDAF开发注意点:
- 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的
- 函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口
-
Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
- init函数类似于构造函数,用于UDAF的初始化
- iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
- terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner
- merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
- terminate返回最终的聚集函数结果
文章评论