@Catyee
2021-08-26T20:57:03.000000Z
字数 11665
阅读 361
分布式理论
MapReduce是一个大数据量下的分布式并行计算框架,适合用来处理海量数据的批量操作。基于MapReduce框架可以把作业分为Map和Reduce两个阶段,这两个阶段都可以并行。
Map任务负责将输入的分块数据通过map函数处理为key-value对,这些key-value对会先写入到缓冲区,然后对缓冲区的数据进行分区(按reduce数量进行分区)、排序(按key快速排序,分区内有序,并非全局有序)、聚合(key相同的数据进行聚合),默认的内存大小是100M,是一个环形的结构,如果超过环形结构内存的80%还会进行溢写,将数据写入到磁盘,最后还会对文件进行分区合并(归并排序)。上面说到的分区、排序、聚合、溢写实际上是map端的shuffle过程。
只要有一个map任务完成,reduce任务就会启动,reduce任务先将多个map任务中自己所对应分区的数据通过网络(http)传输到本地内存,并进行一个merge的操作,内存不够的时候也会发生溢写,这样可能产生多个溢写的文件,所以最后还做文件的合并(归并排序),这一部分实际上是reduce的shuffle过程,这个阶段结束后产生的最终文件将作为reduce任务的输入,然后开始reduce函数的处理过程,reduce函数处理完之后会将结果输出到hdfs上。
一个通用原则是给shuffle过程分配尽可能大的内存,当然前提是需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用。
map端优化:主要在于想办法尽量减少溢写文件的数量,只有一个是最好的,最好是能对map输出的数据量有一个预估,据此设置合理的环状内存大小以及溢出比例。
reduce端优化:reduce阶段如果能让所有数据都保存在内存中是最好的,实际上就是尽量将中间结果保存在内存中,这样减少reduce阶段的io次数,从而达到比较好的效果。
其实我们在做全量导数的时候也会遇到数据倾斜的问题,包括sqoop和datax也都无法避免,这是由于基于切分列拼接where条件这种切片方式导致的,where条件并不能感知切分列所在字段的实际数据分布情况,所以就会造成数据倾斜,这种情况其实没有特别好的处理方式,一种是选择更合适的切分列,另外一种就是直接换一种切片方式,不再采用切分列的切分方式,比如像我们一样使用分页的方式来划分切片。
这其实是一种特定场景下的数据倾斜,实际上是Map端的数据倾斜,是因为读取到的数据不均匀。实际上更为常见的reduce端的数据倾斜,一般只有两种形式的数据倾斜:
处理数据倾斜最简单暴力的方式就是增加并行度,比如把reduce的数量设置大一点,当然这只是一种粗暴的方式,一是增加reduce任务的数量会增加额外的管理成本,分的太多反而跑的更慢。
针对某个分区对应的key太多的情况,我们要看key的选择是否合理,是否可以更改,如果确实业务需要不能更改,那我们可以考虑自己实现partitioner函数,根据key的特征来进行分区,而不再使用默认的hash取模方式分区了。
针对单个key对应数据量太多的情况我们可以在key上加随机的前缀或者后缀这样的方式来将key变成多个key,当然这样处理之后可能一趟mapreduce就没法获取最终结果,还需要一个mapreduce的过程将加盐的key给还原回来获取最终结果。
对于hive来说由于大部分时候都会使用hive sql,所以有一些特别要注意的地方,hive sql中的group by、join、count、distinct等等这些必然会出发shuffle的操作就很有可能造成key分布不均,hive里面本身也有一些参数可以进行调节,比如join造成的数据倾斜,join执行阶段会把join key相同的数据分发到同一个reduce上处理,如果某个key数据太多就可能造成数据倾斜,如果是大小表做join,我们可以开启map join,让小表放入内存,大表流式读取,如果是两个大表join,并且是因为一些热点值造成的数据倾斜,从技术上来说就没有特别好的处理方式了,只能从处理方式着手,比如先将数据按照热点key分为两部分,然后分别处理,最后再做合并。hive本身就有这种方式,我们在hive中指定一个join是skew join,hive就会将key超过某个阈值的数据先上传到hdfs,然后单独开启map join来进行处理,最后进行合并。
还有一种情况比较特殊,那就是空值造成的数据倾斜,一种情况是看业务需求,如果字段为空的数据不需要参与计算,我们可以先过滤掉。另外一种方式是可以将空值处理为随机值。
对于group by造成的数据倾斜,我们可以开启map端的聚合,开启之后会在map端就进行一次聚合操作,可以减少map端传给reduce端的数据量。另外如果知道会发生数据倾斜,我们可以把hive.groupby.skewindata设置为true,这样hive在处理的时候会开启两个map reduce任务,在第一个mapreduce中,map的结果随机shuffle给reduce任务,每个reduce做部分聚合操作,第二个map reduce任务会按照group by的key来进行shuffle,相同key分配到同意给reduce中,完成最终的聚合操作。
主程序:
public class WCMain {
private static String iPath = "hdfs://localhost:9000/wordcount/input/test.txt";
private static String oPath = "hdfs://localhost:9000/wordcount/output/";
/**
* 1. 业务逻辑相关信息通过job对象定义与实现 2. 将绑定好的job提交给集群去运行
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
wcjob.setJarByClass(WCMain.class);
wcjob.setMapperClass(WCMapper.class);
wcjob.setReducerClass(WCReducer.class);
// 设置业务逻辑Mapper类的输出key和value的数据类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
// 设置业务逻辑Reducer类的输出key和value的数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
// 指定要处理的数据所在的位置
FileSystem fs = FileSystem.get(conf);
Path IPath = new Path(iPath);
if (fs.exists(IPath)) {
FileInputFormat.addInputPath(wcjob, IPath);
}
// 指定处理完成之后的结果所保存的位置
Path OPath = new Path(oPath);
fs.delete(OPath, true);
FileOutputFormat.setOutputPath(wcjob, OPath);
// 向yarn集群提交这个job
boolean res = wcjob.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
map类:
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// map方法的生命周期: 框架每传一行数据就被调用一次
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); // 行数据转换为string
String[] words = line.split(" "); // 行数据分隔单词
for (String word : words) { // 遍历数组,输出<单词,1>
context.write(new Text(word), new IntWritable(1));
}
}
}
reducer类
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// 生命周期:框架每传递进来一个kv 组,reduce方法被调用一次
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0; // 定义一个计数器
for (IntWritable value : values) { // 遍历所有v,并累加到count中
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
Hive是基于hadoop理论的一个数据仓库,hadoop理论最重要的就是分布式海量数据存储引起hdfs和分布式计算框架mapreduce,hive就是用hdfs作为底层存储,用mapreduce来做计算,所以hive算是hadoop理论最为直接的实现。为了使用更加简单,hive还提供了sql的功能,实际上就是将sql编译为mapreudce任务去执行。
// TODO 提供架构图
用户接口,也就是客户端,可以是CLI、jdbc/odbc和webgui。
元数据存储:hive将元数据存储在mysql或者derby中,元数据包括表的名字、表的列和分区等信息
hive:hive内部分为解释器、编译器、优化器和执行器,完成hive sql从词法、语法分析到编译、优化和物理执行计划的生成,最终会生成实际的map reduce任务进行执行。
Hive支持的存储数的格式主要有:text(行式存储) 、sequenceFile(行式存储)、orc(列式存储)、parquet(列式存储)
行存储的特点: 查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。传统关系库都是行存。
列存储的特点:
每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。select某些字段效率更高。
orc文件并不是单纯的列式存储,仍然是根据行组分割整个表,在每个行组内按列式存储。每个orc文件都有一到多个stripe(直译是条纹,实际就是行组)组成,每次读取文件都是以行组为单位,stripe又由index data、row data和stripe footer组成。
每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。
hive执行引擎使用MapReduce,一个join就会启动一个job,一条sql语句中如有多个join,则会启动多个job
Common Join
Map Join
MapJoin简单说就是在Map阶段将小表数据从 HDFS 上读取到内存中的哈希表中,读完后将内存中的哈希表序列化为哈希表文件,在下一阶段,当 MapReduce 任务启动时,会将这个哈希表文件上传到 Hadoop 分布式缓存中,该缓存会将这些文件发送到每个 Mapper 的本地磁盘上。因此,所有 Mapper 都可以将此持久化的哈希表文件加载回内存,并像之前一样进行 Join。顺序扫描大表完成Join。减少昂贵的shuffle操作及reduce操作
order by sort by
order by 会对数据进行全局排序,当数据量大的时候会很低效。sort by是数据在进入reduce前进行排序,所以并不能保证全局有序,一般和distribute by 一起执行,而且distribute by 要写在sort by前面。distribute by的分区规则是根据分区字段的hash值与reduce的个数进行取模,相同的会分到同一个reducer,所以可以通过distribute by来控制某些字段的特定行分到同一个reducer,来进行后续的聚合操作。
如果mapred.reduce.tasks=1和order by效果一样,如果大于1会分成几个文件输出每个文件会按照指定的字段排序,而不保证全局有序。
Cluster by(字段) 除了具有Distribute by的功能外,还会对该字段进行排序。
因此,如果分桶和sort字段是同一个时,此时,cluster by = distribute by + sort by
hive调优涉及到压缩和存储调优,参数调优,sql的调优,数据倾斜调优,小文件问题的调优等。
压缩和存储调优是指表文件格式的压缩存储,在不进行压缩情况下orc格式是最好的,占用空间最小,但是一般压缩都会导致执行mapreduce任务的时候不可切分,所以必须先解压,在这种情况下还不如一开始就不压缩,所以个人觉得选用orc就是一种比较好文件格式。
大表进行分区分桶
hdfs是分布式的海量数据存储引擎。具有高可靠、高可用、安全性的特点。
hdfs是一个主从的体系架构,hdfs集群中有两类节点:namenode和datanode。
namenode负责存储所有文件的元信息,包括文件目录树,文件到各个block的映射关系等等,这些信息直接存储在namenode的内存中,为了防止namenode故障重启后内存中的元信息丢失,namenode对于写操作都会先记录日志(editlog),为了防止日志占用太多的空间,并且防止日志太大,重启之后通过日志恢复需要太长时间,还会定期将日志还原成快照(fsimage)存储在磁盘上,当namenode重启之后通过快照和日志就可以恢复到故障前的状态。
namenode还负责对DataNode的状态监控,datanode会定期向NameNode发送心跳以及自己所存储的block列表信息。Namenode通过心跳就知道哪些datanode还活着,并且知道这个datanode中保存了哪些block。如果namenode发现某个datanode故障了,就会在其它datanode节点上增加故障datanode上存储的block备份,用来维持block的冗余数量不变(默认为3)。
如果hdfs集群只有一个namenode,一旦namenode节点故障,整个集群都会不可用,如果namenode所在机器严重故障导致无法恢复,那么hdfs上存储的数据将完全失效,因为我们无从知道哪些datanode存储了哪些block,因此namenode的容错和高可用非常重要。
为了增加namenode的容错,hdfs提供了两种机制,一种是在配置文件中给namenode配置多个工作目录,这样可以将元数据信息存储到多块磁盘上,或者多台机器上,甚至可以将元数据存储到远程挂载的网络文件系统中。
第二种是方案是运行一个辅助的namenode节点,在1.x版本中,这个节点叫做secondary namenode,在2.x版本中由standby namenode兼任(但是机制已经发生变化),这个节点的作用就是定期从namenode拉去fsimage和editlog,然后构造出新的fsimage,然后将新的fsimage返回给namenode,这种方式可以防止namenode中编辑日志过大,同时namenode本身不再负责管理editlog和fsimage,可以节省cpu,减轻压力。在1.x中这个secondary namenode不承担高可用的作用,所以如果namenode故障之后,secondary namenode并不会自动成为namenode,这个时候hdfs仍然是不可用的。
到了2.x的版本,hdfs引入了namenode的高可用机制,工作的那个namenode叫做active namenode,另外一个叫做standby namenode,active namenode对外提供服务,并管理datanode,如果active namenode故障了,standby namenode就会成为active namenode,对外提供服务。引入了高可用机制之后,hdfs去掉了原先的secondary namenode,而是由standby namenode来负责容错和高可用,但是机制已经和secondary namenode不一样了。
active namenode和standby namenode是通过一系列的journal node来同步数据的,Active NameNode会把editlog发送给journal node,只要大多数的journal node写入成功,active namenode就会认为当前的写入操作成功,否则 active NameNode就会认为这次提交EditLog失败,提交EditLog失败会导致Active NameNode退出进程。可以看到Active NameNode提交EditLog到 JournalNode集群的过程实际上是同步阻塞的,但是并不需要所有的JournalNode都调用成功,只要大多数JournalNode调用成功就可以了。
而standby namenode会定期从journal node中同步editlog数据,然后把同步到的editlog应用到自己内存中的文件系统镜像,当满足一次checkpoint时(满足条件:离上一次checkpoint操作是否已经有一个小时,或者HDFS已经进行了100万次操作),standby namenode进行一次合并操作,生成新的fsimage,然后会将新的fsimage上传到active namenode相应的目录中, active namenode接收到新的fsimage之后会删除旧的fsimage。由于standby namenode是定期从journal node中同步数据,所以可能会有滞后,如果standby namenode切换为active之后还需要把落后的editlog给补齐,但是一般落后的都很少,可以快速补齐。
namenode通过zookeeper来进行选主,实际上就是到zookeeper中去创建一个临时节点,zookeeper保证只有一个客户端能够创建成功,所以创建成功的这个namenode就会作为active namenode。另外一个namenode就会监控这个临时节点的删除事件,一旦被删除,它就会尝试创建临时节点,从而让自己成为active namenode
脑裂问题就是由于网络或者gc等原因造成了当前active namenode假死,zookeeper上临时节点被删除,standby namenode切换为active namenode,这个时候旧的active namenode恢复过来了,它依然认为自己是active的状态,所以也对外提供服务,就像有两个大脑一样,所以叫做脑裂。解决方法就是active namenode在成功创建临时节点之后,还会在zookeeper上创建一个持久节点,如果是正常关闭,active namenode会把临时节点和持久节点一起删掉,但如果是异常关闭,持久节点不会被删掉,当standby namenode切换为active之后发现持久节点还在,就会对原来的active namenode进行隔离,会先尝试把它转换为standby namenode,如果不行的话就会通过远程运行一个脚本直接杀死进程。
除了上述操作,还有epoch机制来防止脑裂。
DataNode负责数据块的实际存储和读/写工作。在hadoop1.0时,DataNode的数据块默认大小为64M,2.0版本后,块的默认大小为128M。当客户端上传一个大文件时,HDFS会自动将其切割成固定大小的Block,每个块以多份的形式存储在集群上,默认为3份。
datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。
如果一台DataNode经过10分30秒(默认)后没有给NameNode发送心跳信息,而被NameNode判断为死亡,NameNode会马上将其上的数据备份到集群中其他机器上。当这个DataNode节点排除故障后,重新回到集群中,该节点上还保存着原来那批数据,而默认的配置情况下,DataNode会每隔60分钟向NameNode发送一次Block信息,在这段时间内,集群中会有某些数据块多出一个备份。在NameNode收到该节点的Block信息后,它发现数据备份多了才会命令某些DataNode删除掉多余的备份数据。
1)客户端向namenode请求上传文件
2)namenode收到请求之后检查客户端要求上传的文件是否已存在,父目录是否存在,是否有权限,然后响应客户端
3)如果客户端收到的响应是允许上传,客户端就会向namenode发出请求询问第一块block该上传到哪里
4)namenode查询datanode信息(忙碌情况,远近情况等),然后返回三个namenode返回3个datanode地址给客户端
5)客户端请求与最近的一个datanode节点(一般是客户端所在机器上的namenode,如果客户端在其它集群,则随机选取,假设为datanode1)建立传输通道,并告知其还要传给datanode2和datanode3。datanode1会请求与datanode2建立连接,datanode2会请求与datanode3建立连接。
6)datanode3响应datanode2的连接请求,通道建立成功。同理,datanode2响应datanode1,datanode1响应客户端。
7)客户端收到通道建立成功的消息后,开始向datanode1发送block1的数据,以一个个package(64k)为单位通过通道向datanode1写数据,datanode1收到数据会将其存在本地缓存中,一边向datanode2传数据,一边将缓存中的数据保存到磁盘上。
8)客户端在传送数据时会有一个package的应答队列,datanode1每收到一个package后就向客户端发回消息(datanode1不用等待datandoe2发回应答信息才给客户端发送信息,客户端只保证datanode1收到了数据就行,后面的事它交给了datanode1)
9)当一个block传输完成之后,客户端再次请求namenode上传第二个block
默认情况下,在运行客户端的那个节点上存放第1个副本,如果客户端运行在集群之外,则随机选择一个节点存放第1块,但namenode会尽量选择那些情况好的datanode(存储不太满,当时不太忙,带宽比较高)。第2个副本存放在与第1个副本所在机架不同的另一个机架上的datanode中(随机选择另一机架上的另一情况较好的datanode),第3个副本存在与第2个副本相同机架的另一个datanode上
1)跟namenode通信,请求下载某个数据。
2)namenode查询元数据信息以及block位置信息。
3)将数据所在的datanode信息返回给客户端。
4)客户端根据数据所在的datanode,挑选一台距离自己最近的datanode,并向其发出下载文件的请求(若所需数据不在一台datanode上保存,则分别向多台datanode发出请求)。
5)datanode响应客户端请求,将数据返回给客户端。
6)从多个datanode获得的数据不断在客户端追加,形成完整的数据
客户端向namenode发出删除文件的请求,namenode上面没有实际存储数据,所以只是标记哪些block需要删除,namenode不会主动联系datanode删除数据,namenode和datanode之间的联系都是通过心跳,datanode定期主动向namenode发送心跳,namenode在回复心跳的时候才会让datanode删除对应的block,所以整个删除是有延迟的,当然实际上不是直接删除,而是移入到一个回收站的目录,可以进行恢复