@tsing1226
2015-12-15T21:59:33.000000Z
字数 6184
阅读 1573
用户自定义函数(User define Function,UDF)
Hive UDF编程准备工作,在配置文件pom.xml 添加依赖包。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.5.0</hadoop.version>
<hive.version>0.13.1</hive.version>
</properties>
<!-- Hive Client -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
create database if not exists db_1205 ;
create table IF NOT EXISTS db_1205.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
stored as textfile ;
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table db_1205.bf_log_src ;
select count(1) from db_1205.bf_log_src ;
select * from db_1205.bf_log_src limit 2 ;
日志文件中的每条数据中,存在空格,导致加载数据有问题
对原数据进行预处理,在此我们选择正则表达式来处理上述问题。同时我们考虑企业在生产平台上的常见做法:
主要思路如下:
- 原表
原数据,文件内容不变- 针对不同的月创建不同的子表
- 数据文件存储格式
orcfile/parquet- 数据文件压缩
snappy- map output
中间结果数据是否压缩snappy- 外部表
- 分区表
create table db_1205.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
stored as textfile ;
验证正则表达式正确的网址:http://tool.chinaz.com/regex
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table db_1205.bf_log_src ;
select * from db_1205.bf_log_src limit 2 ;
方式1
drop table if exists db_1205.df_log_comm ;
create table IF NOT EXISTS db_1205.df_log_comm (
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
WITH SERDEPROPERTIES (
"input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
stored as orc tblproperties ("orc.compress"="SNAPPY");
方式2
create table IF NOT EXISTS db_1205.df_log_comm2 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orc tblproperties ("orc.compress"="SNAPPY") AS select remote_addr, time_local, request, http_referer from db_1205.bf_log_comm ;
insert into table db_1205.df_log_comm
select remote_addr, time_local, request, http_referer from db_1205.bf_log_src ;
java代码
package com.hadoop.senoir.bigdata.hive;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class RemoveQuteUDF extends UDF{
public Text evaluate(Text str){
if(null == str){
return null;
}
// validate
if(StringUtils.isBlank(str.toString())){
return null ;
}
// remove qutes
return new Text(str.toString().replace("\"", ""));
}
}
hive udf函数注册
add jar /opt/datas/RemoveQuteUDF.jar ;
create temporary function RemoveQute as 'com.hadoop.senoir.bigdata.hive.RemoveQuteUDF';
使用RemoveQute函数
select RemoveQute(remote_addr) addrr ,RemoveQute(time_local) date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm limit 5;
java代码:
package com.hadoop.senoir.bigdata.hive;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class GetIpAddressUDF extends UDF{
public Text evaluate(Text str){
if(null == str){
return null;
}
// validate
if(StringUtils.isBlank(str.toString())){
return null ;
}
//get line
String[] line=str.toString().split("\\.");
// remove qutes
return new Text(line[0]+"."+line[1]);
}
}
hive udf函数注册
add jar /opt/datas/GetIpAddressUDF.jar ;
create temporary function GetIpAddress as 'com.hadoop.senoir.bigdata.hive.GetIpAddressUDF';
使用GetIpAddress函数
select GetIpAddress(RemoveQute(remote_addr)) addrr ,RemoveQute(time_local) date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm limit 5;
比如:将31/Aug/2015:15:06:14 +0800 转换成20150831150614格式。
java代码
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class DateTransUDF extends UDF{
//Declare two variables
//31/Aug/2015:00:04:53 +0800
private final SimpleDateFormat inputFormat =new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
//201508310004353
private final SimpleDateFormat outputFormat =new SimpleDateFormat("yyyyMMddHHmmss");
public Text evaluate(Text str){
Text output =new Text() ;
if(null == str){
return null;
}
// validate
if(StringUtils.isBlank(str.toString())){
return null ;
}
try {
//parse
Date parseDate=inputFormat.parse(str.toString());
//transform
String outputDate=outputFormat.format(parseDate);
//output
output.set(outputDate);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
return output ;
}
return output ;
}
}
hive udf函数注册
add jar /opt/datas/DateTransUDF.jar ;
create temporary function DateTrans as 'com.hadoop.senoir.bigdata.hive.DateTransUDF';
使用DateTrans函数
select GetIpAddress(RemoveQute(remote_addr)) addrr ,DateTrans(RemoveQute(time_local)) date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm limit 5;
create table if not exists db_1205.df_log_comm_opt
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
stored as orc tblproperties ("orc.compress"="SNAPPY")
AS select GetIpAddress(RemoveQute(remote_addr))addrr ,DateTrans(RemoveQute(time_local)) date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm ;
错误信息:
产生原因:
在书写udf时使用的判断语句不完整,没有考虑字符串为空时的情况;测试单个udf并没有报错,但是多个udf函数嵌套使用时出错。以前的判断语句:
if(null == str.toString()){
return null;
}
修改后的判断语句:
if(null == str){
return null;
}
// validate
if(StringUtils.isBlank(str.toString())){
return null ;
}
select * from db_1205.df_log_comm_opt limit 5 ;
select addrr, substring(date,9 ,2) hour,request ,refer from db_1205.df_log_comm_opt limit 5;
select t.hour,count(*) cnt
from(select substring(date,9,2) hour from db_1205.df_log_comm_opt)
t group by t.hour order by cnt desc;
参考地址:https://cwiki.apache.org/confluence/display/Hive/HivePlugins