[关闭]
@tsing1226 2015-12-28T22:27:22.000000Z 字数 3126 阅读 2341

hive

浅谈Hive数据倾斜

数据倾斜

数据倾斜:由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。数据倾斜造成map处理数据量差异过大,任务进度长时间维持在99%(或100%),只有少量(1个或几个)reduce子任务未完成。

产生数据倾斜的原因

  • 数据在各个节点上的分布不均匀;

  • Join时,key中存在个别值得数据量比较大(如null值);

  • count(disctinct key),在数据量大的情况下,容易出现数据倾斜,因为count(distinct)是按照group by字段进行分组。

数据倾斜的解决方案

参数调节

在Map端进行部分数据合并

参数set hive.map.aggr ;--- 是否在Map端进行数据聚合,默认设置为true;Map 端部分聚合,相当于Combiner。

参数 set hive.groupby.mapaggr.checkinterval ; --在Map端进行聚合操作的条目数。

负载均衡

参数set hive.groupby.skewindata,默认值是false,需要设置成true ;当设置为true时,会变成两个
MapReduce ;

第一个MR JOb中,map的输出结果会随机分布到Reduce中,每个map做部分聚合操作,并输出结果,这样处理出来的结果相同的Group By Key有可能被分发到不同的Reduce中,从而达到辅助均衡目的。

第二个MR JOb,会根据预处理数据结果按照key分布到Reduce中,最终完成聚合操作。

SQL语句调节

如何Join

驱动表的选取,选用join key分布最均匀的表作为驱动表;
做好列裁剪和filter操作,以达到两个表做join的时候,数据量相对较小的效果。

大小表Join

设置参数:

set hive.auto.convert.join=true;

使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce.

SMB(Sort-Merge-Bucket) Join(大表与大表)

设置参数:

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmap.join=true;
set hive.optimize.bucketmapjoin.sortmerge=true ;

大表Join大表--空值key:

把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

特殊情况特殊处理

在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

group by维度过小

采用sum() group by的方式来替换count(distinct)完成计算。

count(distinct xx)

原因:容易倾斜,当xx字段存在大量的某个值时,NULL或者空的记录
解决思路:将特定的值,进行特定的处理。如出现较多null,采用过滤掉,where case; 特定方式转换特定的值,使得这些值不一样,同时这些值不影响分析。

特殊情况特殊处理

在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

使用场景

空值产生的数据倾斜

场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1: user_id为空的不参与关联

select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;

解决方法2 :赋与空值分新的key值

select * from log a
left join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;

结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1中 log读取两次,jobs是2。解决方法2 job数是1 。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

不同数据类型关联产生数据倾斜

场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方法:把数字类型转换成字符串类型

select * from users a
left  join logs b
on a.usr_id = cast(b.user_id as string)

map join 解决倾斜问题

使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。 以下例子:

select * from log a
left join users b
on a.user_id = b.user_id;

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

解决方法:

select /*+mapjoin(x)*/* from log a
left outer join (
  select  /*+mapjoin(c)*/d.*
  from ( select distinct user_id from log ) c
  join users d
  on c.user_id = d.user_id
) b
  on a.user_id = b.user_id;

假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

总结

处理数据倾斜目标是使map的输出数据更均匀的分布到reduce中去。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:

1、采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。

2、tmp1记录数会比较少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。

3、map读入users和log,假如记录来自log,则检查user_id是否在tmp2里,如果是,输出到本地文件a,否则生成的key,value对,假如记录来自member,生成的key,value对,进入reduce阶段。

4、最终把a文件,把Stage3 reduce阶段输出的文件合并起写到hdfs。

如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:

1、对于join,在判断小表不大于1G的情况下,使用map join

2、对于group by或distinct,设定 hive.groupby.skewindata=true

3、尽量使用上述的SQL语句调节进行优化

参考地址:http://www.cnblogs.com/ggjucheng/archive/2013/01/03/2842860.html
 

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注