@tsing1226
2016-01-02T14:54:06.000000Z
字数 8476
阅读 964
hive
需求分析:
--use database
use db_track;
--drop table session_info
drop table if exists session_info;
--create table session_info
create table if not exists session_info(
session_id string ,
guid string ,
trackU string ,
landing_url string ,
landing_url_ref string ,
user_id string ,
pv string ,
stay_time string ,
min_trackTime string ,
ip string ,
provinceId string
)
partitioned by (date string)
row format delimited fields terminated by '\t' ;
tmp_session_info
--use databases
use db_track ;
--drop table
drop table if exists tmp_session_info ;
--create table
create table tmp_session_info as
select
sessionid ,
max(guid) guid ,
max(enduserid) user_id ,
count(url) pv ,
unix_timestamp(max(tracktime))-unix_timestamp(min(tracktime)) stay_time ,
unix_timestamp(min(tracktime)) min_trackTime ,
max(ip) ip ,
max(provinceid) provinceId
from db_track.track_log
where date='20150828'
group by sessionid ;
tmp_track_url
use db_track ;
drop table if exists tmp_track_url ;
create table tmp_track_url as
select
sessionid ,
tracktime ,
trackeru,
url ,
referer
from db_track.track_log
where date='20150828' ;
session_info
--insert data into table
insert overwrite table session_info partition(date='20150828')
select
a.sessionid sessionid ,
max(a.guid) guid ,
max(b.trackeru) trackU ,
max(b.url) landing_url ,
max(b.referer) landing_url_ref ,
max(a.user_id) user_id ,
max(a.pv) pv ,
max(a.stay_time/1000) stay_time ,
max(a.min_trackTime) min_trackTime ,
max(a.ip) ip ,
max(a.provinceId) provinceId
from
db_track.tmp_session_info a
join
db_track.tmp_track_url b
on
a.sessionid=b.sessionid
group by a.sessionid ;
use db_track ;
drop table if exists tmp_visit_daily ;
create table tmp_visit_daily
row format delimited fields terminated by '\t'
as
select
date ,
count(distinct guid) uv ,
sum(pv) pv ,
count(distinct case when user_id is not null then user_id else NULL end ) login_users ,
count(distinct session_id)-count(distinct case when user_id is null then user_id else NULL end ) visit_users ,
avg(stay_time) avg_stay_time ,
count(case when pv >=2 then session_id else NULL end )/count(session_id) second_rate ,
count(distinct ip) ip_num
from session_info
where date='20150828'
group by date ;
use db_track ;
drop table visit_daily ;
create table visit_daily(
date varchar(255) not null,
uv varchar(255) ,
pv varchar(255) ,
login_users varchar(255) ,
visit_users varchar(255) ,
avg_stay_time varchar(255) ,
second_rate varchar(255) ,
ip_num varchar(255) ,
primary key(date)
);
bin/sqoop export \
--connect jdbc:mysql://hadoop-senior02.grc.com:3306/db_track \
--username root \
--password 123456 \
--table visit_daily \
--num-mappers 1 \
--input-fields-terminated-by "\t" \
--export-dir /user/hive/warehouse/db_track.db/tmp_visit_daily
在实际项目中,我们需要考虑数据存储格式、压缩格式、数据倾斜等问题。
--use database
use db_track;
--drop table session_info
drop table if exists session_info;
--create table session_info
create table if not exists session_info(
session_id string ,
guid string ,
trackU string ,
landing_url string ,
landing_url_ref string ,
user_id string ,
pv string ,
stay_time string ,
min_trackTime string ,
ip string ,
provinceId string
)
partitioned by (date string)
row format delimited fields terminated by '\t'
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY") ;
tmp_session_info
--use databases
use db_track ;
--drop table
drop table if exists tmp_session_info ;
--create table
create table tmp_session_info
row format delimited fields terminated by '\t'
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as
select
sessionid ,
max(guid) guid ,
max(enduserid) user_id ,
count(url) pv ,
unix_timestamp(max(tracktime))-unix_timestamp(min(tracktime)) stay_time ,
unix_timestamp(min(tracktime)) min_trackTime ,
max(ip) ip ,
max(provinceid) provinceId
from db_track.track_log
where date='20150828'
group by sessionid ;
tmp_track_url
use db_track ;
drop table if exists tmp_track_url ;
create table tmp_track_url
row format delimited fields terminated by '\t'
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as
select
sessionid ,
tracktime ,
trackeru,
url ,
referer
from db_track.track_log
where date='20150828' ;
--insert data into table
insert overwrite table session_info partition(date='20150828')
select
a.sessionid sessionid ,
max(a.guid) guid ,
max(b.trackeru) trackU ,
max(b.url) landing_url ,
max(b.referer) landing_url_ref ,
max(a.user_id) user_id ,
max(a.pv) pv ,
max(a.stay_time/1000) stay_time ,
max(a.min_trackTime) min_trackTime ,
max(a.ip) ip ,
max(a.provinceId) provinceId
from
db_track.tmp_session_info a
join
db_track.tmp_track_url b
on
a.sessionid=b.sessionid
group by a.sessionid ;
use db_track ;
drop table if exists tmp_visit_daily ;
create table tmp_visit_daily
row format delimited fields terminated by '\t'
as
select
date ,
count(distinct guid) uv ,
sum(pv) pv ,
count(distinct user_id) login_users ,
count(distinct (case when session_id is not null then session_id else concat('hive',rand()) end ))-count(distinct user_id) visit_users ,
avg(stay_time) avg_stay_time ,
count(case when pv >=2 then session_id else NULL end )/count(session_id) second_rate ,
count(distinct ip) ip_num
from session_info
where date='20150828'
group by date ;
use db_track ;
drop table visit_daily ;
create table visit_daily(
date varchar(255) not null,
uv varchar(255) ,
pv varchar(255) ,
login_users varchar(255) ,
visit_users varchar(255) ,
avg_stay_time varchar(255) ,
second_rate varchar(255) ,
ip_num varchar(255) ,
primary key(date)
);
bin/sqoop export \
--connect jdbc:mysql://hadoop-senior02.grc.com:3306/db_track \
--username root \
--password 123456 \
--table visit_daily \
--num-mappers 1 \
--input-fields-terminated-by "\t" \
--export-dir /user/hive/warehouse/db_track.db/tmp_visit_daily
每天加载前一天的日志文件数据到表db_track.track_log,并根据需求分析数据,并导入数据带RBMS啥关系型数据库中。
说明:数据日志文件,放在一台日志文件
每天日志文件放入一台同一目录
- 名称【年月日】组成,如【20151212】
日志文件数据为每小时产生一次
- 名称年月日时组成【2015122000】
根据需求分析数据
- 入数据到RBMS关系型数据库中
daily_hour_visit.sh
#!/bin/sh
## set HIVE_HOME
HIVE_HOME=/opt/cdh3.5.6/hive-0.13.1-cdh5.3.6
##set SQOOP_HOME
SQOOP_HOME=/opt/cdh3.5.6/sqoop-1.4.5-cdh5.3.6
##set script path
SCRIPT_PATH=/opt/datas/track_logs
## track logs directory
LOG_DIR=/opt/datas/track_logs
##get date
yesterday=`date -d -1days '+%Y%m%d'`
for line in `ls $yesterday`
do
#echo $line
#get date and hour
date=${line:0:4}${line:4:2}${line:6:2}
hour=${line:8:2}
echo "----load data into table db_track. track_log--$--"
$HIVE_HOME/bin/hive -e "load data local inpath '$LOG_DIR/$yesterday/$line' into table db_track.track_log partition(date=$date,hour=$hour)"
echo " ------Hive load data into table result-------- "
##data process by hive
$HIVE_HOME/bin/hive --hiveconf date=$date -f $SCRIPT_PATH/hive_visit.sql
done
##export data to mysql use sqoop
echo "-------export data to mysql use sqoop------------"
$SQOOP_HOME/bin/sqoop --options-file $SCRIPT_PATH/export_daily_visit.txt
hive_visit.sql
-- create table tmp_session_info
--use databases
use db_track ;
--drop table
drop table if exists tmp_session_info ;
--create table
create table tmp_session_info
row format delimited fields terminated by '\t'
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as
select
sessionid ,
max(guid) guid ,
max(enduserid) user_id ,
count(url) pv ,
unix_timestamp(max(tracktime))-unix_timestamp(min(tracktime)) stay_time ,
unix_timestamp(min(tracktime)) min_trackTime ,
max(ip) ip ,
max(provinceid) provinceId
from db_track.track_log
where date="${hiveconf:date}"
group by sessionid ;
--create table tmp_track_url
use db_track ;
drop table if exists tmp_track_url ;
create table tmp_track_url
row format delimited fields terminated by '\t'
STORED AS PARQUET tblproperties ("orc.compress"="SNAPPY")
as
select
sessionid ,
tracktime ,
trackeru,
url ,
referer
from db_track.track_log
where date="${hiveconf:date}" ;
--create table session_info
--insert data into table
insert overwrite table session_info partition(date="${hiveconf:date}")
select
a.sessionid sessionid ,
max(a.guid) guid ,
max(b.trackeru) trackU ,
max(b.url) landing_url ,
max(b.referer) landing_url_ref ,
max(a.user_id) user_id ,
max(a.pv) pv ,
max(a.stay_time/1000) stay_time ,
max(a.min_trackTime) min_trackTime ,
max(a.ip) ip ,
max(a.provinceId) provinceId
from
db_track.tmp_session_info a
join
db_track.tmp_track_url b
on
a.sessionid=b.sessionid
group by a.sessionid ;
--create table tmp_visit_daily
use db_track ;
drop table if exists tmp_visit_daily ;
create table tmp_visit_daily
row format delimited fields terminated by '\t'
as
select
date ,
count(distinct guid) uv ,
sum(pv) pv ,
count(distinct user_id) login_users ,
count(distinct (case when session_id is not null then session_id else concat('hive',rand()) end ))-count(distinct user_id) visit_users ,
avg(stay_time) avg_stay_time ,
count(case when pv >=2 then session_id else NULL end )/count(session_id) second_rate ,
count(distinct ip) ip_num
from session_info
where date="${hiveconf:date}"
group by date ;
export_daily_visit.txt
export
--connect
jdbc:mysql://hadoop-senior02.grc.com:3306/db_track
--username
root
--password
123456
--table
visit_daily
--num-mappers
1
--input-fields-terminated-by
"\t"
--export-dir
/user/hive/warehouse/db_track.db/tmp_visit_daily
问题:脚本写出后,只要运行脚本就可以完成上述需求,但是工作人员不能每天都操作,这也不现实,我们通过linux Crontab对脚本进行定时处理。
我们设计是2点10分将数据导入hive结果表,并按需求分析处理,并将处理结果并导出到关系型数据库中。
##load_track-log.sh
10 02 * * * /opt/datas/track_logs/daily_hour_visit.sh