@tsing1226
        
        2015-12-15T13:59:33.000000Z
        字数 6184
        阅读 1827
    用户自定义函数(User define Function,UDF)
Hive UDF编程准备工作,在配置文件pom.xml 添加依赖包。
   <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.5.0</hadoop.version>
        <hive.version>0.13.1</hive.version>
    </properties>
    <!-- Hive Client -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>${hive.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
     </dependency>
create database if not exists db_1205 ;
create table IF NOT EXISTS db_1205.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
stored as textfile ;
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table db_1205.bf_log_src ;
select  count(1) from db_1205.bf_log_src ; 
select * from db_1205.bf_log_src limit 2 ;

日志文件中的每条数据中,存在空格,导致加载数据有问题
对原数据进行预处理,在此我们选择正则表达式来处理上述问题。同时我们考虑企业在生产平台上的常见做法:
主要思路如下:
- 原表
 
原数据,文件内容不变- 针对不同的月创建不同的子表
 - 数据文件存储格式
 
orcfile/parquet- 数据文件压缩
 
snappy- map output
 
中间结果数据是否压缩snappy- 外部表
 - 分区表
 
create table db_1205.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
stored as textfile ;

验证正则表达式正确的网址:http://tool.chinaz.com/regex
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table db_1205.bf_log_src ;
select * from db_1205.bf_log_src limit 2 ;

方式1
drop table if exists db_1205.df_log_comm ;
create table IF NOT EXISTS db_1205.df_log_comm (
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
WITH SERDEPROPERTIES (
  "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
stored as orc tblproperties ("orc.compress"="SNAPPY");
方式2
create table IF NOT EXISTS db_1205.df_log_comm2 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' stored as orc tblproperties ("orc.compress"="SNAPPY") AS select remote_addr, time_local, request, http_referer from db_1205.bf_log_comm ;
insert into table db_1205.df_log_comm
select remote_addr, time_local, request, http_referer from db_1205.bf_log_src ;

java代码
package com.hadoop.senoir.bigdata.hive;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class RemoveQuteUDF extends UDF{
            public Text evaluate(Text str){
                if(null == str){
                    return null;
                }
                // validate 
                if(StringUtils.isBlank(str.toString())){
                    return null ;
                }
            // remove qutes 
            return new Text(str.toString().replace("\"", ""));
        }
}
hive udf函数注册
add jar /opt/datas/RemoveQuteUDF.jar ;
create temporary function RemoveQute as 'com.hadoop.senoir.bigdata.hive.RemoveQuteUDF';
使用RemoveQute函数
select RemoveQute(remote_addr) addrr ,RemoveQute(time_local)  date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm limit 5;

java代码:
package com.hadoop.senoir.bigdata.hive;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class GetIpAddressUDF extends UDF{
            public Text evaluate(Text str){
                if(null == str){
                    return null;
                }
                // validate 
                if(StringUtils.isBlank(str.toString())){
                    return null ;
                }
            //get line
            String[] line=str.toString().split("\\.");
            // remove qutes 
            return new Text(line[0]+"."+line[1]);
        }
}
hive udf函数注册
add jar /opt/datas/GetIpAddressUDF.jar ; 
create temporary function GetIpAddress as 'com.hadoop.senoir.bigdata.hive.GetIpAddressUDF';
使用GetIpAddress函数
select GetIpAddress(RemoveQute(remote_addr)) addrr ,RemoveQute(time_local)  date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm limit 5;

比如:将31/Aug/2015:15:06:14 +0800 转换成20150831150614格式。
java代码
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class DateTransUDF extends UDF{
    //Declare two variables
    //31/Aug/2015:00:04:53 +0800
    private  final SimpleDateFormat inputFormat =new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
    //201508310004353
    private final SimpleDateFormat outputFormat =new SimpleDateFormat("yyyyMMddHHmmss");
        public Text evaluate(Text str){
                Text output =new Text() ;
                if(null == str){
                    return null;
                }
                // validate 
                if(StringUtils.isBlank(str.toString())){
                    return null ;
                }
            try {
                //parse
                Date parseDate=inputFormat.parse(str.toString());
                //transform
                String outputDate=outputFormat.format(parseDate);
                //output
                output.set(outputDate);
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
                return output ;
            }
            return output ;
        }
}
hive udf函数注册
add jar /opt/datas/DateTransUDF.jar ;
create temporary function DateTrans as 'com.hadoop.senoir.bigdata.hive.DateTransUDF';
使用DateTrans函数
select GetIpAddress(RemoveQute(remote_addr)) addrr ,DateTrans(RemoveQute(time_local)) date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm limit 5;
create table if not exists db_1205.df_log_comm_opt
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
stored as orc tblproperties ("orc.compress"="SNAPPY")
AS select GetIpAddress(RemoveQute(remote_addr))addrr ,DateTrans(RemoveQute(time_local))  date,RemoveQute(request) request ,RemoveQute(http_referer) refer from db_1205.df_log_comm ;
错误信息:
产生原因:
在书写udf时使用的判断语句不完整,没有考虑字符串为空时的情况;测试单个udf并没有报错,但是多个udf函数嵌套使用时出错。以前的判断语句:
if(null == str.toString()){
                return null;
            }
修改后的判断语句:
if(null == str){
            return null;
        }
        // validate 
        if(StringUtils.isBlank(str.toString())){
            return null ;
        }
select * from db_1205.df_log_comm_opt limit 5 ;

select addrr, substring(date,9 ,2) hour,request ,refer from db_1205.df_log_comm_opt limit 5;

select t.hour,count(*) cnt
from(select substring(date,9,2) hour from db_1205.df_log_comm_opt)
t group by t.hour order by cnt desc;

参考地址:https://cwiki.apache.org/confluence/display/Hive/HivePlugins