[关闭]
@tsing1226 2016-01-02T14:54:06.000000Z 字数 8476 阅读 964

hive

Hive日志分析案例--日志流量分析案例2

需求分析:


按需分析

创建Hive表session_info

--use database
use db_track;
--drop table session_info
drop table if exists session_info;
--create table session_info
create table if not exists session_info(
    session_id string ,
    guid string ,
    trackU string ,
    landing_url string ,
    landing_url_ref string ,
    user_id string ,
    pv string ,
    stay_time string ,
    min_trackTime string ,
    ip string ,
    provinceId string 
)
partitioned by (date string)
row format delimited fields terminated by '\t' ;

创建Hive表tmp_session_info

--use databases
use db_track ;
--drop table 
drop table if exists tmp_session_info ;
--create table
create table tmp_session_info as 
    select 
    sessionid ,
    max(guid) guid ,
    max(enduserid) user_id ,
    count(url) pv ,
    unix_timestamp(max(tracktime))-unix_timestamp(min(tracktime)) stay_time ,
    unix_timestamp(min(tracktime)) min_trackTime ,
    max(ip) ip ,
    max(provinceid) provinceId
from db_track.track_log
    where date='20150828'
group by sessionid ;

创建Hive表tmp_track_url

use db_track ;
drop table if exists tmp_track_url ;
create table tmp_track_url as
    select
    sessionid ,
    tracktime ,
    trackeru,
    url ,
    referer 
from db_track.track_log
where date='20150828' ;

导入数据到session_info

 --insert  data into table
 insert overwrite table session_info partition(date='20150828')
 select 
      a.sessionid sessionid ,
      max(a.guid) guid ,
      max(b.trackeru) trackU ,
      max(b.url) landing_url ,
      max(b.referer) landing_url_ref ,
      max(a.user_id) user_id ,
      max(a.pv) pv ,
      max(a.stay_time/1000) stay_time ,
      max(a.min_trackTime) min_trackTime ,
      max(a.ip) ip ,
      max(a.provinceId) provinceId 
 from
     db_track.tmp_session_info a
 join 
     db_track.tmp_track_url b
 on  
     a.sessionid=b.sessionid 
 group by a.sessionid ;

创建中间结果集表

use  db_track ;
drop table if exists tmp_visit_daily ;
create table tmp_visit_daily 
row format delimited fields terminated by '\t' 
as 
select 
    date ,
    count(distinct guid) uv ,
    sum(pv) pv ,
    count(distinct case when user_id is not null then user_id else NULL end ) login_users ,
    count(distinct session_id)-count(distinct case when user_id is null then user_id else NULL end ) visit_users ,
    avg(stay_time) avg_stay_time ,
    count(case when pv >=2 then session_id else NULL end )/count(session_id) second_rate ,
    count(distinct ip) ip_num 
from session_info
where date='20150828'
group by date ;

在MySQL创建表visit_daily

use db_track ;
drop table visit_daily ;
create table visit_daily(
    date varchar(255) not null,
    uv varchar(255) ,
    pv varchar(255) ,
    login_users varchar(255) ,
    visit_users varchar(255) ,
    avg_stay_time varchar(255) ,
    second_rate varchar(255) ,
    ip_num varchar(255) ,
    primary key(date)
);

SQOOP导出到MySQL表中

bin/sqoop export \
--connect jdbc:mysql://hadoop-senior02.grc.com:3306/db_track \
--username root \
--password 123456 \
--table visit_daily \
--num-mappers 1 \
--input-fields-terminated-by "\t" \
--export-dir /user/hive/warehouse/db_track.db/tmp_visit_daily

在实际项目中,我们需要考虑数据存储格式、压缩格式、数据倾斜等问题。

对hive表优化

创建Hive表session_info

--use database
use db_track;
--drop table session_info
drop table if exists session_info;
--create table session_info
create table if not exists session_info(
    session_id string ,
    guid string ,
    trackU string ,
    landing_url string ,
    landing_url_ref string ,
    user_id string ,
    pv string ,
    stay_time string ,
    min_trackTime string ,
    ip string ,
    provinceId string 
)
partitioned by (date string)
row format delimited fields terminated by '\t' 
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY") ;

创建Hive表tmp_session_info

--use databases
use db_track ;
--drop table 
drop table if exists tmp_session_info ;
--create table
create table tmp_session_info 
row format delimited fields terminated by '\t' 
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as 
    select 
     sessionid ,
     max(guid) guid ,
     max(enduserid) user_id ,
     count(url) pv ,
     unix_timestamp(max(tracktime))-unix_timestamp(min(tracktime)) stay_time ,
     unix_timestamp(min(tracktime)) min_trackTime ,
     max(ip) ip ,
     max(provinceid) provinceId
 from db_track.track_log
 where date='20150828'
 group by sessionid ;

创建Hive表tmp_track_url

 use db_track ;
 drop table if exists tmp_track_url ;
 create table tmp_track_url 
 row format delimited fields terminated by '\t' 
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as
  select
    sessionid ,
    tracktime ,
    trackeru,
    url ,
    referer 
 from db_track.track_log
 where date='20150828' ;

导入数据到session_info

--insert  data into table
insert overwrite table session_info partition(date='20150828')
select 
    a.sessionid sessionid ,
    max(a.guid) guid ,
    max(b.trackeru) trackU ,
    max(b.url) landing_url ,
    max(b.referer) landing_url_ref ,
    max(a.user_id) user_id ,
    max(a.pv) pv ,
    max(a.stay_time/1000) stay_time ,
    max(a.min_trackTime) min_trackTime ,
    max(a.ip) ip ,
    max(a.provinceId) provinceId 
from
    db_track.tmp_session_info a
join 
    db_track.tmp_track_url b
on  
     a.sessionid=b.sessionid 
group by a.sessionid ;

创建中间结果集表

use  db_track ;
drop table if exists tmp_visit_daily ;
create table tmp_visit_daily 
row format delimited fields terminated by '\t' 
as 
select 
    date ,
    count(distinct guid) uv ,
    sum(pv) pv ,
    count(distinct user_id) login_users ,
    count(distinct (case when session_id is  not null then session_id else concat('hive',rand()) end ))-count(distinct user_id) visit_users ,
    avg(stay_time) avg_stay_time ,
    count(case when pv >=2 then session_id else NULL end )/count(session_id) second_rate ,
    count(distinct ip) ip_num 
from session_info
where date='20150828'
group by date ;

在MySQL创建表visit_daily

use db_track ;
drop table visit_daily ;
create table visit_daily(
    date varchar(255) not null,
    uv varchar(255) ,
    pv varchar(255) ,
    login_users varchar(255) ,
    visit_users varchar(255) ,
    avg_stay_time varchar(255) ,
    second_rate varchar(255) ,
    ip_num varchar(255) ,
    primary key(date)
);

SQOOP导出到MySQL表中

bin/sqoop export \
--connect jdbc:mysql://hadoop-senior02.grc.com:3306/db_track \
--username root \
--password 123456 \
--table visit_daily \
--num-mappers 1 \
--input-fields-terminated-by "\t" \
--export-dir /user/hive/warehouse/db_track.db/tmp_visit_daily

每天加载前一天的日志文件数据到表db_track.track_log,并根据需求分析数据,并导入数据带RBMS啥关系型数据库中。

说明:数据日志文件,放在一台日志文件

  • 每天日志文件放入一台同一目录

    • 名称【年月日】组成,如【20151212】
  • 日志文件数据为每小时产生一次

    • 名称年月日时组成【2015122000】
  • 根据需求分析数据

  • 入数据到RBMS关系型数据库中

写脚本处理

daily_hour_visit.sh

#!/bin/sh
## set HIVE_HOME
HIVE_HOME=/opt/cdh3.5.6/hive-0.13.1-cdh5.3.6
##set SQOOP_HOME
SQOOP_HOME=/opt/cdh3.5.6/sqoop-1.4.5-cdh5.3.6
##set script path
SCRIPT_PATH=/opt/datas/track_logs
## track logs directory
LOG_DIR=/opt/datas/track_logs
##get date
yesterday=`date -d -1days '+%Y%m%d'`
for line in `ls $yesterday`
do
  #echo $line
      #get date and hour
      date=${line:0:4}${line:4:2}${line:6:2}
  hour=${line:8:2}
    echo "----load data into table db_track. track_log--$--"
$HIVE_HOME/bin/hive -e "load data local inpath '$LOG_DIR/$yesterday/$line' into table db_track.track_log partition(date=$date,hour=$hour)"
echo  " ------Hive load data into table result-------- "
##data process by hive
$HIVE_HOME/bin/hive --hiveconf date=$date -f $SCRIPT_PATH/hive_visit.sql
    done
    ##export data to mysql use sqoop
    echo "-------export data to mysql use sqoop------------"
    $SQOOP_HOME/bin/sqoop --options-file $SCRIPT_PATH/export_daily_visit.txt

hive_visit.sql

-- create table tmp_session_info
--use databases
use db_track ;
--drop table 
drop table if exists tmp_session_info ;
--create table
create table tmp_session_info 
row format delimited fields terminated by '\t' 
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as 
    select 
     sessionid ,
     max(guid) guid ,
     max(enduserid) user_id ,
     count(url) pv ,
     unix_timestamp(max(tracktime))-unix_timestamp(min(tracktime)) stay_time ,
     unix_timestamp(min(tracktime)) min_trackTime ,
     max(ip) ip ,
     max(provinceid) provinceId
 from db_track.track_log
 where date="${hiveconf:date}"
     group by sessionid ;
     --create table tmp_track_url
     use db_track ;
     drop table if exists tmp_track_url ;
     create table tmp_track_url 
     row format delimited fields terminated by '\t' 
    STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
    as
      select
    	sessionid ,
    	tracktime ,
    	trackeru,
    	url ,
    	referer 
     from db_track.track_log
     where date="${hiveconf:date}" ;

--create table session_info
--insert  data into table
insert overwrite table session_info partition(date="${hiveconf:date}")
select 
    a.sessionid sessionid ,
    max(a.guid) guid ,
    max(b.trackeru) trackU ,
    max(b.url) landing_url ,
    max(b.referer) landing_url_ref ,
    max(a.user_id) user_id ,
    max(a.pv) pv ,
    max(a.stay_time/1000) stay_time ,
    max(a.min_trackTime) min_trackTime ,
    max(a.ip) ip ,
    max(a.provinceId) provinceId 
from
    db_track.tmp_session_info a
join 
    db_track.tmp_track_url b
on  
         a.sessionid=b.sessionid 
group by a.sessionid ;

--create table tmp_visit_daily

use  db_track ;
drop table if exists tmp_visit_daily ;
create table tmp_visit_daily 
row format delimited fields terminated by '\t' 
as 
select 
    date ,
    count(distinct guid) uv ,
    sum(pv) pv ,
    count(distinct user_id) login_users ,
    count(distinct (case when session_id is  not null then session_id else concat('hive',rand()) end ))-count(distinct user_id) visit_users ,
    avg(stay_time) avg_stay_time ,
    count(case when pv >=2 then session_id else NULL end )/count(session_id) second_rate ,
    count(distinct ip) ip_num 
from session_info
where date="${hiveconf:date}"
group by date ;

export_daily_visit.txt

export
--connect
jdbc:mysql://hadoop-senior02.grc.com:3306/db_track
--username
root
--password
123456
--table
visit_daily
--num-mappers
1
--input-fields-terminated-by
"\t" 
--export-dir 
/user/hive/warehouse/db_track.db/tmp_visit_daily

问题:脚本写出后,只要运行脚本就可以完成上述需求,但是工作人员不能每天都操作,这也不现实,我们通过linux Crontab对脚本进行定时处理。
我们设计是2点10分将数据导入hive结果表,并按需求分析处理,并将处理结果并导出到关系型数据库中。

crontab -e

##load_track-log.sh
10 02 * * * /opt/datas/track_logs/daily_hour_visit.sh
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注