[关闭]
@zhangyy 2019-01-03T14:37:26.000000Z 字数 4479 阅读 192

hive 的日志处理与python的数据清洗

hive的部分


  • 一:hive 清理日志处理 统计PV、UV 访问量
  • 二: hive 数据python 的数据清洗

一: 日志处理

  1. 统计每个时段网站的访问量:

1.1 在hive 上面创建表结构:

  1. 在创建表时不能直接导入问题
  2. create table db_bflog.bf_log_src (
  3. remote_addr string,
  4. remote_user string,
  5. time_local string,
  6. request string,
  7. status string,
  8. body_bytes_sent string,
  9. request_body string,
  10. http_referer string,
  11. http_user_agent string,
  12. http_x_forwarded_for string,
  13. host string
  14. )
  15. ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
  16. WITH SERDEPROPERTIES (
  17. "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
  18. )
  19. STORED AS TEXTFILE;

image_1ak9l1v8n45v1cb58jflk0d3p9.png-25.4kB

1.2 加载数据到 hive 表当中:

  1. load data local inpath '/home/hadoop/moodle.ibeifeng.access.log' into table db_bflog.bf_log_src ;

image_1ak9l2jjl19ff1ugm1rc7119tbsfm.png-12.3kB

1.3 自定义UDF函数

1.3.1:udf函数去除相关引号

  1. package org.apache.hadoop.udf;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.hive.ql.exec.UDF;
  4. import org.apache.hadoop.io.Text;
  5. /**
  6. * * New UDF classes need to inherit from this UDF class.
  7. *
  8. * @author zhangyy
  9. *
  10. */
  11. public class RemoveQuotesUDF extends UDF {
  12. /*
  13. 1. Implement one or more methods named "evaluate" which will be called by Hive.
  14. 2."evaluate" should never be a void method. However it can return "null" if needed.
  15. */
  16. public Text evaluate(Text str){
  17. if(null == str){
  18. return null;
  19. }
  20. // validate
  21. if(StringUtils.isBlank(str.toString())){
  22. return null ;
  23. }
  24. // lower
  25. return new Text(str.toString().replaceAll("\"", ""));
  26. }
  27. public static void main(String[] args) {
  28. System.out.println(new RemoveQuotesUDF().evaluate(new Text("\"GET /course/view.php?id=27 HTTP/1.1\"")));
  29. }
  30. }

1.3.2:udf函数时间格式进行转换

  1. package org.apache.hadoop.udf;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.Locale;
  5. import org.apache.commons.lang.StringUtils;
  6. import org.apache.hadoop.hive.ql.exec.UDF;
  7. import org.apache.hadoop.io.Text;
  8. /**
  9. * * New UDF classes need to inherit from this UDF class.
  10. *
  11. * @author zhangyy
  12. *
  13. */
  14. public class DateTransformUDF extends UDF {
  15. private final SimpleDateFormat inputFormat = new SimpleDateFormat("dd/MMM/yy:HH:mm:ss", Locale.ENGLISH) ;
  16. private final SimpleDateFormat outputFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ;
  17. /*
  18. 1. Implement one or more methods named "evaluate" which will be called by Hive.
  19. 2."evaluate" should never be a void method. However it can return "null" if needed.
  20. */
  21. /**
  22. * input:
  23. * 31/Aug/2015:00:04:37 +0800
  24. * output:
  25. * 2015-08-31 00:04:37
  26. */
  27. public Text evaluate(Text str){
  28. Text output = new Text() ;
  29. if(null == str){
  30. return null;
  31. }
  32. // validate
  33. if(StringUtils.isBlank(str.toString())){
  34. return null ;
  35. }
  36. try{
  37. // 1) parse
  38. Date parseDate = inputFormat.parse(str.toString().trim());
  39. // 2) transform
  40. String outputDate = outputFormat.format(parseDate) ;
  41. // 3) set
  42. output.set(outputDate);
  43. }catch(Exception e){
  44. e.printStackTrace();
  45. }
  46. // lower
  47. return output;
  48. }
  49. public static void main(String[] args) {
  50. System.out.println(new DateTransformUDF().evaluate(new Text("31/Aug/2015:00:04:37 +0800")));
  51. }
  52. }
  1. RemoveQuotesUDF DateTransformUDF 到出成jar 放到/home/hadoop/jars 目录下面:

image_1ak9k58st5971n7ad6brg0167k9.png-6.2kB

1.4 去hive 上面 生成 udf 函数

  1. RemoveQuotesUDF 加载成udf函数
  2. add jar /home/hadoop/jars/RemoveQuotesUDF.jar ;
  3. create temporary function My_RemoveQuotes as "org.apache.hadoop.udf.RemoveQuotesUDF" ;
  4. DateTransformUDF 加载成udf 函数:
  5. add jar /home/hadoop/jars/DateTransformUDF.jar ;
  6. create temporary function My_DateTransform as "org.apache.hadoop.udf.DateTransformUDF" ;

image_1ak9l7o0snte1se11te8q7154u13.png-15.2kB
image_1ak9l8j9g17rk15fa14j93ma1fok1g.png-16.3kB
image_1ak9lb8uq1che18dn1vgk5s3chh2a.png-3.8kB
image_1ak9lc94s1rkrf1rh7vs146ji2n.png-9.1kB


1.5 创建生成所要要求表:

  1. create table db_bflog.bf_log_comm(
  2. remote_addr string,
  3. time_local string,
  4. request string,
  5. http_referer string
  6. )
  7. ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  8. STORED AS ORC tblproperties ("orc.compress"="SNAPPY");

image_1ak9lh8pcteo1tmb1t0ilgflsn34.png-14.3kB

从原有表中提取 相关的数据处理:

  1. insert into table db_bflog.bf_log_comm select remote_addr, time_local, request, http_referer from db_bflog.bf_log_src ;

image_1ak9m41qb99h11618cn1nb816vc3h.png-52kB

执行sql 统计每小时的pv 访问量:

  1. select t.hour,count(*) cnt
  2. from
  3. (select substring(my_datetransform(my_removequotes(time_local)),12,2) hour from bf_log_comm) t
  4. group by t.hour order by cnt desc ;

image_1ak9m4ott159v1b951b7lvn79q53u.png-47.1kB

image_1ak9m5b7i16ho1e89q5o1i367524b.png-12.7kB

二: hive 数据python 的数据清洗

  1. 统计国外一家影院的每周看电影的人数
  2. 测试数据下载地址:
  3. wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
  4. unzip ml-100k.zip

2.1 创建hive 的数据表

  1. CREATE TABLE u_data (
  2. userid INT,
  3. movieid INT,
  4. rating INT,
  5. unixtime STRING)
  6. ROW FORMAT DELIMITED
  7. FIELDS TERMINATED BY '\t'
  8. STORED AS TEXTFILE;

image_1ak9v8816t9d1vu41p71qrp10nm4o.png-12.4kB

2.2 加载数据:

  1. LOAD DATA LOCAL INPATH '/home/hadoop/ml-100k/u.data'
  2. OVERWRITE INTO TABLE u_data;

image_1ak9v8r3h116g731t5d1i4tp6655.png-13.3kB

2.3 创建weekday_mapper.py 脚本

  1. import sys
  2. import datetime
  3. for line in sys.stdin:
  4. line = line.strip()
  5. userid, movieid, rating, unixtime = line.split('\t')
  6. weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  7. print '\t'.join([userid, movieid, rating, str(weekday)])

2.4 创建临时hive 表 用于提取数据:

  1. CREATE TABLE u_data_new (
  2. userid INT,
  3. movieid INT,
  4. rating INT,
  5. weekday INT)
  6. ROW FORMAT DELIMITED
  7. FIELDS TERMINATED BY '\t';
  8. 增加python 脚本到hive
  9. add FILE /home/hadoop/weekday_mapper.py;

image_1ak9vgov623ncue1nq68ik1tc45i.png-9.7kB

2.5 从旧表中数据提取

  1. INSERT OVERWRITE TABLE u_data_new
  2. SELECT
  3. TRANSFORM (userid, movieid, rating, unixtime)
  4. USING 'python weekday_mapper.py'
  5. AS (userid, movieid, rating, weekday)
  6. FROM u_data;

image_1ak9vrmvb5651qap1pj12ie765v.png-33.9kB

2.6 查找所需要的数据:

  1. SELECT weekday, COUNT(*)
  2. FROM u_data_new
  3. GROUP BY weekday;

image_1aka0hk8s1tis127k6odkq4pl99.png-48.4kB

image_1aka0ishi1o6q186hvd919q9g2um.png-25.1kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注