@Catyee
2021-08-14T15:11:43.000000Z
字数 13598
阅读 399
面试
概念:分布式系统指硬件或软件分布在不同的网络计算机上,彼此之间仅仅通过消息传递来通信和协调。可以是按照业务垂直分布(微服务),也可以是同一个业务水平分布(多活)
好处:服务能力提升,互相影响减小,并发增高,容错性增加;扩展性能力强;降低成本(普通机器就可胜任);程序员只用关心一部分业务
坏处:故障总会发生,比如通信异常,节点故障等等
Cap理论是指一个分布式系统不可能同时满足一致性,可用性和分区容错性,最多只能同时满足其中三个基本需求。
一致性:指分布式环境中,数据在多个节点之间能否保持数据或状态一致的特性。
可用性:指系统提供的服务总是处于可用的状态
分区容错性:指分布式系统在遇到网络故障或者节点故障的时候仍然能够对外提供满足一致性的服务。除非整个分布式系统所有服务都发生了故障。
由于不可兼得,所以需要进行取舍。大多数情况下都会选择放弃强一致性,而选择可用性和分区容错性。但是Zookeeper不太一样,Zookeeper尽量做到强一致性(实际上还是最终一致性),而放弃了部分可用性(选举的时候不能对外提供服务)
base理论是对Cap中一致性和可用性权衡的结果。核心思想是即使无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性。
base理论中的三个概念——基本可用、软状态和最终一致性:
基本可用指分布式系统在出现故障的时候,允许损失部分可用性,比如响应时间上的损失和功能上的部分损失。
软状态:指允许系统中的数据存在中间状态,可以简单理解为允许数据在节点之间同步的时候存在延迟
最终一致性:指系统中的数据副本,在经过一段时间的同步之后,最终能够达到一个一致的状态。
Base理论面向的是大型高可用可扩展的分布式系统,和传统事务的ACID特性是相反的,它完全不同于ACID的强一致性模型,而是提出通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终能达到一致性的状态。
强一致性(线性一致性)
强一致性可以理解为当写入操作完成后,任何客户端去访问任何存储节点的值都是最新的值,将分布式的一致性过程对客户端透明,客户端操作一个强一致性的数据库时感觉自己操作的是一个单机数据库,强一致性就是CAP定理中所描述的C(Consistency)
弱一致性
弱一致性是与强一致性对立的一种一致性级别,这种一致性级别不承诺立即可以读到写入的值,也不具体承诺多久之后数据能够达到一致,但会尽可能地保证到某个时间级别(比如秒级别)后,数据能够达到一致状态。弱一致性还可以再进行细分∶
会话一致性∶该一致性级别只保证对于写入的值,在同一个客户端会话
中可以读到一致的值,但其他的会话不能保证。
用户一致性∶该一致性级别只保证对于写入的值,在同一个用户中可以
读到一致的值,但其他用户不能保证。
在某些系统下,对一致性的要求并不高,从而可以舍弃强一致策略带来的性能与可用性消耗。
最终一致性
最终一致性也可以理解成弱一致性的一种,使用这种一致性级别,依旧可能在写入后读到旧值,但做出的改进是要求数据在有限的时间窗口内最终达到一致的状态。也就说就算现在不一致,也早晚会达到一致,但狭义上的弱一致性并不对一致性做出任何保证,也许某些节点永远不会达到一致,其实最终一致性的核心就是保证同步的请求不会丢失,在请求到达时节点的状态变为最新状态,而不考虑请求传输时的不一致窗口,DNS就是典型的最终一致性系统。
为了解决分布式一致性问题,涌现了很多经典的一致性协议和算法,最著名的就是二阶段提交、三阶段提交和Paxos算法、Raft算法。
二阶段提交、三阶段提交应对的是分布式系统中,不同节点协同完成同一个任务的情况;而Paxos算法、Raft算法、ZAB算法都是为了应对分布式系统中,某个任务可以单节点执行,但是每个节点要最终保持结果一致的情况,两种算法应对的场景并不一样。
在一个分布式集群中,要执行一个分布式事务,最直接的方法是引入一个协调者,用于统一调度分布式事务在分布式节点上的执行逻辑,这些分布式节点叫做参与者。二阶段提交实际上就是协调者和参与者执行分布式事务的过程,其主要目的就是为了解决分布式事务的原子性问题,保证分布式事务的多个参与者要么都执行成功要么都执行失败。
从名字就可以看出二阶段提交就是将事务分成了两个阶段来处理:
投票阶段:
a、事务询问:当协调者收到一个事务请求之后,先记录日志,然后通知所有参与者要执行一个事务,并询问所有的参与者是否可以执行提交操作,等待参与者回应
b、参与者开始执行事务操作,将事务操作的undo和redo信息记入事务日志,但是并不提交
c、参与者先在自己日志中记录自己是否可以提交事务,然后向协调者回应。如果参与者在执行事务的时候每个步骤都成功了,就回复可以提交,如果执行事务的时候失败了,就立刻回滚,并回应不能提交。
事务提交/回滚阶段:
在第二个阶段中,协调者要根据参与者的反馈情况来决定这个事务最终是提交还是回滚:
a、如果所有参与者都进行了响应,并且响应结果都是可以提交事务,那协调者就会做出提交事务的决定,先记录自己的决定到自己的日志,然后发送提交事务的请求给每一个参与者。这一步只要协调者记录了日志,就可以响应客户端的请求了,不需要等到参与者都提交了之后再回应。参与者收到提交的请求之后就正式提交事务,释放资源,然后向协调者进行回复,协调者收到所有参与者的反馈之后完成事务。
b、如果某一个参与者向协调者反馈不能提交事务,或者超过了超时时间,还有一些参与者没有响应是否提交事务,协调者就会做出中断事务的决定,同样先往自己的日志中记录自己的决定,然后向每个参与者发送回滚事务的请求。参与者接收到事务回滚的请求之后,根据undo日志来回滚,然后释放资源并向协调者进行回复。协调者接收到所有参与者的回复消息之后,完成事务的中断。
逻辑上二阶段提交很清晰,但是由于分布式系统都靠网络来传输消息,而网络是不可靠的,网络的原因将导致各种各样状况的发生,会使得情况异常的复杂。
1、第一阶段协调者询问能否提交事务,部分参与者响应超时
协调者将做出中断事务的决定,先写日志,然后通知参与者中断事务。这种情况下,可能是协调者发送了事务请求,参与者没有接收到,也可能是参与者执行完事务并进行了回复,但是协调者没收到,如果是后者,实际上每个参与者可能都做好了准备,但是协调者没有接收到回应,只能做出中断事务的决定,这是一种保守的策略,会浪费一些资源。
2、假如某个参与者执行完了事务,并回应了协调者自己可以提交(如果不能提交,可以回复不能提交,然后直接回滚,不用等待协调者通知),然后这个参与者等待协调者的决定,等待过程超时:
这种情况比较复杂,可能协调者做出了提交的决定,只是自己没有收到,所以自己不能擅自回滚,但也不能擅自提交,因为协调者也可能做出中断事务的决定。这个时候可以一直阻塞,但也有一些可以优化的点,比如这个参与者是A,A可以去询问其它参与者,如果某个参与者说我没接收到协调者的决定,但是我自己的事务执行失败了,我向协调者发出了不能提交事务的回应,这个时候A也就可以直接回滚了。另外的情况,如果某一个参与者说我接受到了协调者的决定,协调者的决定是提交(回滚),那A也就可以提交(回滚)了。但如果被询问的节点也没有回复,或者回复说也没有收到协调者的命令,这个时候A别无他法只能继续阻塞。这个优化叫做"超时终止协议",它能解决一部分的超时等待问题,但不能解决全部,并且增加了编程的复杂性。
1、如果协调者做出了决定,但是决定还没发送或者只发送了部分就宕机了
这种情况下,协调者重启之后可以从自己的日志中知道自己之前做的决定,参与者可以通过互相询问的方式来知道这个决定,也可以重新询问协调者。
如果协调者日志中都没有自己做的决定,那直接中断事务就可以了。
2、参与者执行了事务,还没来得及回复协调者就发生了宕机:
这个时候参与者先从自己日志中读取日志,如果发现磁盘中有“yes”记录,那就可以发起超时终止协议。
二阶段提交主要有同步阻塞、单点瓶颈、数据不一致,过于保守导致资源浪费这些问题。
同步阻塞:二阶段提交协议存在的最明显也是最大的一个问题就是同步阻塞,在二阶段提交的执行过程中,所有参与该事务操作的逻辑都处于阻塞状态,也就是说,各个参与者在等待其他参与者响应的过程中,将无法进行其他任何操作, 这会极大的影响分布式系统的性能。
单点问题:如果协调者出现问题,整个流程将无法运转。
数据不一致:在正确记录日志的情况下,最终数据会一致,但是中间可能会有很长一段时间处于不一致的情况,比如某个参与者宕机。
三阶段提交是二阶段提交的改进版,就是将二阶段提交的准备阶段和提交阶段更进一步划分,划分为三个阶段,也就是询问阶段(canCommit)准备阶段(preCommit)和提交阶段(commit)。
二阶段提交中,当协调者发起询问之后,参与者会先执行事务,然后响应是否可以提交,这是同一个过程。但假如某一个参与者因为某些原因一开始就知道执行会失败,那这个时候其它参与者执行事务其实是没有意义的,如果执行了还会陷入阻塞,要等待协调者发出的回滚命令。所以三阶段提交分为三个阶段,第一个阶段协调者进行询问,参与者先不执行事务,而是先回应自己能不能执行事务,如果都回答可以,协调者再发出命令执行事务,然后协调者等待参与者回应能否提交事务,如果所有参与者都回答可以提交事务,协调者再做出提交事务的决定。
三阶段提交主要减缓了二阶段提交阻塞的问题,没有解决其它可能存在的问题,所以大部分分布式系统任然使用的是二阶段提交。
回到Paxos,Raft、ZAB协议所面对的场景中来,应对这种场景的一致性算法需要保证如下几点:
1、所有提案只有一个被选定
2、如果某个进程认为某个提案被选定了,那么这提案必须是真的被选定的那个。
Paxos算法中有三种角色:Proposer、Acceptor、和Learner。
Hadoop是一个开源的分布式的基础架构,以分布式文件系统HDFS和分布式计算框架MapReduce为核心,适合用于PT级别海量数据的存储与分析。发展到现在,Hadoop已经成为了完善的生态圈,包含了大量用于大数据储存, 管理, 传输, 分析计算的框架,比如Hive、Zookeeper、Hbase等等,但是基础仍然是HDFS与MapReduce。
HDFS(Hadoop Distributed File System)是Hadoop项目的核心子项目,是分布式计算中数据存储管理的基础,是基于流数据模式访问和处理超大文件的需求而开发的,可以运行于廉价的商用服务器上。它所具有的高容错、高可靠性、高可扩展性、高获得性、高吞吐率等特征为海量数据提供了不怕故障的存储,为超大数据集(Large Data Set)的应用处理带来了很多便利。
Hadoop用于海量数据处理,主要有如下几个优势:
降低成本:Hadoop可以运行在一般商业机器构成的大型集群上
弹性扩缩容:Hadoop通过增加集群节点,可以线性地扩展以处理更大的数据集。同时,在集群负载下降时,也可以减少节点,以高效使用计算资源。
健壮:Hadoop在设计之初,就将故障检测和自动恢复作为一个设计目标,它可以从容处理通用计算平台上出现的硬件失效的情况。
MapReduce具有固定的执行流程(map—Shuffle—reduce),可以直接将包含map/reduce函数的作业划分为map和reduce两个阶段。map阶段包含多个可以并行执行的map任务,reduce阶段包含多个可以并行执行的reduce任务。map任务负责将输入的分块数据进行map()处理,并将其输出结果写入缓冲区,然后对缓冲区中的数据进行分区、排序、聚合等操作,最后将数据输出到磁盘上的不同分区中。reduce任务首先将map任务输出的对应分区数据通过网络传输拷贝到本地内存中,内存空间不够时,会将内存数据排序后写入磁盘,然后经过归并、排序等阶段产生reduce()的输入数据。reduce()处理完输入数据后,将输出数据写入分布式文件系统中。
shuffle过程:
从运算效率的出发点,map输出结果优先存储在map节点的内存中。每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区块满时,需要将缓冲区中的数据以一个临时文件的方式存到磁盘,当整个map task结束后再对磁盘中这个map task所产生的所有临时文件做合并,生成最终的输出文件。最后,等待reduce task来拉取数据。当然,如果map task的结果不大,能够完全存储到内存缓冲区,且未达到内存缓冲区的阀值,那么就不会有写临时文件到磁盘的操作,也不会有后面的合并。
map执行后,得到key/value键值对。接下来的问题就是,这些键值对应该交给哪个reduce做?注意:reduce的个数是允许用户在提交job时,通过设置方法设置的!
MapReduce提供partitioner接口解决上述问题。默认操作是:对key hash后再以reduce task数量取模,返回值决定着该键值对应该由哪个reduce处理。
这种默认的取模方式只是为了平均reduce的处理能力,防止数据倾斜,保证负载均衡。
接下来,需要将key/value以及Partition结果都写入到缓冲区,缓冲区的作用:批量收集map结果,减少磁盘IO的影响。
当然,写入之前,这些数据都会被序列化成字节数组。而整个内存缓冲区就是一个字节数组。
这个内存缓冲区是有大小限制的,默认100MB。当map task的输出结果很多时,就可能撑爆内存。需将缓冲区的数据临时写入磁盘,然后重新利用这块缓冲区。
从内存往磁盘写数据被称为Spill(溢写),由单独线程完成,不影响往缓冲区写map结果的线程。溢写比例:spill.percent(默认0.8)。
当缓冲区的数据达到阀值,溢写线程启动,锁定这80MB的内存,执行溢写过程。剩下的20MB继续写入map task的输出结果。互不干涉!
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是mapreduce模型的默认行为,也是对序列化的字节做的排序。排序规则:字典排序!
map task的输出结果写入内存后,当溢写线程未启动时,对输出结果并没有做任何的合并。从官方图可以看出,合并是体现在溢写的临时磁盘文件上的,且这种合并是对不同的
reduce端的数值做的合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端,那么需要将这些键值对拼接到一块,减少与partition相关的索引记录。如果client设置过Combiner,其会将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。注意:这里的合并并不能保证map结果中所有的相同的key值的键值对的value都合并了,它合并的范围只是这80MB,它能保证的是在每个单独的溢写文件中所有键值对的key值均不相同!
溢写生成的临时文件的个数随着map输出结果的数据量变大而增多,当整个map task完成,内存中的数据也全部溢写到磁盘的一个溢写文件。
也就是说,不论任何情况下,溢写过程生成的溢写文件至少有一个!但是最终的文件只能有一个,需要将这些溢写文件归并到一起,称为merge。
merge是将所有的溢写文件归并到一个文件,结合上面所描述的combiner的作用范围,归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对,如果没有,merge得到的就是键值集合,如{“aaa”, [5, 8, 2, …]}
当mapreduce任务提交后,reduce task就不断通过RPC从JobTracker那里获取map task是否完成的信息,如果获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。其实呢,reduce task在执行之前的工作就是:不断地拉取当前job里每个map task的最终结果,并对不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。
具体的过程:
1、MapReduce编程模型的表达能力较弱,仅使用map()和reduce()两个函数难以实现复杂的数据操作;
2、处理流程固定,不容易实现迭代计算;
3、基于磁盘进行数据传递,效率较低。
大数据具有数据量大(Volume)、数据类型多样(Variety)、产生与处理速度快(Velocity)、价值高(Value)的“4V”特性。一般大数据的处理都是采用基于分治、归并和函数式编程思想的MapReduce分布式计算思想,后来的大数据处理框架也都是采用“分治-聚合”策略来对数据进行分布并行处理。
3、MapReduce缺点:
1、MapReduce编程模型的表达能力较弱,仅使用map()和reduce()两个函数难以实现复杂的数据操作;
2、处理流程固定,不容易实现迭代计算;
3、基于磁盘进行数据传递,效率较低。
Spark的改进:
一方面对大数据处理框架的输入/输出、中间数据进行建模,将这些数据抽象为统一的数据结构,命名为弹性分布式数据集(Resilient Distributed Dataset,RDD),并在此数据结构上构建了一系列通用的数据操作,使得用户可以简单地实现复杂的数据处理流程;另一方面采用基于内存的数据聚合、数据缓存等机制来加速应用执行,尤其适用于迭代和交互式应用。
一个大数据应用可以表示为<输入数据,用户代码,配置参数>。
大数据处理框架大体可以分为四层结构:用户层、分布式数据并行处理层、资源管理与任务调度层、物理执行层。以Apache Spark框架为例,在用户层中,用户需要准备数据、开发用户代码、配置参数。之后,分布式数据并行处理层根据用户代码和配置参数,将用户代码转化成逻辑处理流程(数据单元及数据依赖关系),然后将逻辑处理流程转化为物理执行计划(执行阶段及执行任务)。资源管理与任务调度层根据用户提供的资源需求来分配资源容器,并将任务(task)调度到合适的资源容器上运行。物理执行层实际运行具体的数据处理任务。
用户层即方便用户开发大数据应用,用户需要将一个大数据应用表示为&It;输入数据、用户代码、参数配置>。
输入数据一般存放在hdfs、hbase中,也可以在关系数据库,输入数据交给框架后,框架会进行切分,一般一个分块就对应一个任务。如果是流式数据,比如数据来源于网络socket、kafka,数据可以以微批或者连续的形式输入到流式处理框架中(spark streaming、flink)
用户代码可以是用户手写的MapReduce代码,或者是基于其他大数据处理框架的具体应用处理流程的代码。在实际系统中,用户撰写用户代码后,大数据处理框架会生成一个Driver程序,将用户代码提交给集群运行。例如,在Hadoop MapReduce中,Driver程序负责设定输入/输出数据类型,并向Hadoop MapReduce框架提交作业;在Spark中,Driver程序不仅可以产生数据、广播数据给各个task,而且可以收集task的运行结果,最后在Driver程序的内存中计算出最终结果。
配置参数可以分为两大类:一类是与资源相关的配置参数。例如,buffer size定义框架缓冲区的大小,影响map/reduce任务的内存用量。在Hadoop中,map/reduce任务实际启动一个JVM来运行,因此用户还要设置JVM的大小,也就是heap size。在Spark中,map/reduce任务在资源容器(Executor JVM)中以线程的方式执行,用户需要估算应用的资源需求量,并设置应用需要的资源容器个数、CPU个数和内存大小。另一类是与数据流相关的配置参数。例如,Hadoop和Spark中都可以设置partition()函数、partition个数和数据分块大小。partition()函数定义如何划分map()的输出数据。partition个数定义产生多少个数据分块,也就是有多少个reduce任务会被运行。数据分块大小定义map任务的输入数据大小。
一句话总结:用户层就是用户定义具体一个大数据应用的地方,用户要准备好输入数据,编写好处理流程的代码,并配置好执行时的资源参数和任务参数
分布式数据并行处理层首先将用户提交的应用转化为较小的计算任务,然后通过调用底层的资源管理与任务调度层实现并行执行。
在Hadoop MapReduce中,这个转化过程是直接的。因为MapReduce具有固定的执行流程(map—Shuffle—reduce),可以直接将包含map/reduce函数的作业划分为map和reduce两个阶段。map阶段包含多个可以并行执行的map任务,reduce阶段包含多个可以并行执行的reduce任务。map任务负责将输入的分块数据进行map()处理,并将其输出结果写入缓冲区,然后对缓冲区中的数据进行分区、排序、聚合等操作,最后将数据输出到磁盘上的不同分区中。reduce任务首先将map任务输出的对应分区数据通过网络传输拷贝到本地内存中,内存空间不够时,会将内存数据排序后写入磁盘,然后经过归并、排序等阶段产生reduce()的输入数据。reduce()处理完输入数据后,将输出数据写入分布式文件系统中。(MapReduce的具体过程)
与Hadoop MapReduce不同,Spark上应用的转化过程包含两层:逻辑处理流程、执行阶段与执行任务划分。如图1.7所示,Spark 首先根据用户代码中的数据操作语义和操作顺序,将代码转化为逻辑处理流程。逻辑处理流程包含多个数据单元和数据依赖,每个数据单元包含多个数据分块。然后,框架对逻辑处理流程进行划分,生成物理执行计划。该计划包含多个执行阶段(stage),每个执行阶段包含若干执行任务(task)。
为了将用户代码转化为逻辑处理流程,Spark对输入/输出、中间数据进行了更具体的抽象处理,将这些数据用一个统一的数据结构表示。在Spark中,输入/输出、中间数据被表示成RDD(弹性分布式数据集)。在RDD上可以执行多种数据操作,如简单的map(),以及复杂的cogroup()、join()等。一个RDD可以包含多个数据分区(partition)。parent RDD和child RDD之间通过数据依赖关系关联,支持一对一和多对一等数据依赖关系。数据依赖关系的类型由数据操作的类型决定。逻辑处理流程是一个有向无环图(DAG图),其中的节点是数据单元RDD。
为了将逻辑处理流程转化为物理执行计划,Spark首先根据RDD之间的数据依赖关系,将整个流程划分为多个小的执行阶段(stage)。之后,在每个执行阶段形成计算任务(task),计算任务的个数一般与RDD中分区的个数一致。与MapReduce不同的是,一个Spark job可以包含很多个执行阶段,而且每个执行阶段可以包含多种计算任务,因此并不能严格地区分每个执行阶段中的任务是map任务还是reduce任务。另外,在Spark中,用户可以通过调用cache() 接口使框架缓存可被重用的中间数据。例如,当前job的输出可能会被下一个job用到,那么用户可以使用cache()对这些数据进行缓存。
一句话总结:分布式并行处理层就是将用户定义好的大数据应用转换为逻辑处理流程,逻辑处理流程定义了数据单元和数据依赖关系,在spark中有一个统一的数据结构,叫做RDD,即弹性分布式数据集,RDD之间以有向无环图来建立数据依赖关系;之后将逻辑处理流程转换为物理执行计划,物理执行计划定义了执行的阶段和任务的划分,spark中根据RDD的依赖关系,可以划分为好几个执行阶段,每个阶段可以有多个执行任务,一般来说就是RDD中分区的数量,执行的中间数据可以进行缓存,从而提升性能
从系统架构上讲,大数据处理框架一般是主-从(Master-Worker)结构。主节点负责接收用户提交的应用,处理请求,管理应用运行的整个生命周期。从节点负责执行具体的数据处理任务,并在运行过程中向主节点汇报任务的执行状态。Spark支持不同的部署模式,如Standalone部署模式、YARN部署模式和Mesos部署模式等。其中Standalone部署模式与Hadoop MapReduce部署模式基本类似,唯一区别是Hadoop MapReduce部署模式为每个task启动一个JVM进程运行,而且是在task将要运行时启动JVM,而Spark是预先启动资源容器(Executor JVM),然后当需要执行task时,再在Executor JVM里启动task线程运行。
大数据处理框架的物理执行层负责启动task,执行每个task的数据处理步骤。在Hadoop MapReduce中,一个应用需要经历map、Shuffle、reduce 3个数据处理阶段。而在Spark中,一个应用可以有更多的执行阶段(stage),如迭代型应用可能有几十个执行阶段,每个执行阶段也包含多个task。另外,这些执行阶段可以形成复杂的DAG图结构。在物理执行时首先执行上游stage中的task,完成后执行下游stage中的task。
在Hadoop MapReduce中,每个task对应一个进程,也就是说每个task以JVM(Java虚拟机)的方式来运行,所以在Hadoop MapReduce中task的内存用量指的是JVM的堆内存用量。在Spark中,每个task对应JVM中的一个线程,而一个JVM可能同时运行了多个task,因此JVM的内存空间由task共享。在应用未运行前,我们难以预知task的内存消耗和执行时间,也难以预知JVM中的堆内存用量。
从应用特点来分析,我们可以将task执行过程中主要消耗内存的数据分为以下3类。
(1)框架执行时的中间数据。例如,map()输出到缓冲区的数据和reduce task在Shuffle阶段暂存到内存中的数据。
(2)框架缓存数据。例如,在Spark中,用户调用cache()接口缓存到内存中的数据。
(3)用户代码产生的中间计算结果。例如,用户代码调用map()、reduce()、combine(),在处理输入数据时会在内存中产生中间计算结果。
针对Spark经常出现的垃圾回收时间长、频繁等问题,Spark社区采用堆外内存管理机制和基于堆外内存的Shuffle机制,提出了钨丝计划。
spark逻辑处理流程包括四部分:
(1)数据源(Data blocks):数据源表示的是原始数据,数据可以存放在本地文件系统和分布式文件系统中,如HDFS、分布式Key-Value数据库(HBase)等。在IntelliJ IDEA中运行单机测试时,数据源可以是内存数据结构,如list(1,2,3,4,5);对于流式处理来说,数据源还可以是网络流等。这里我们只讨论批式处理,所以限定数据源是静态数据。
(2)数据模型:确定了数据源后,我们需要对数据进行操作处理。首要问题是如何对输入/输出、中间数据进行抽象表示,使得程序能够识别处理。Hadoop MapReduce框架将输入/输出、中间数据抽象为<K,V>record,这样map()/reduce()按照<K,V>record的形式读入并处理数据,最后输出为<K,V>record形式。这种数据表示方式的优点是简单易操作,缺点是过于细粒度.Spark认知到了这个缺点,将输入/输出、中间数据抽象表示为统一的数据模型(数据结构),命名为RDD(弹性分布式数据集)
RDD中可以包含各种类型的数据,可以是普通的Int、Double,也可以是<K,V>record等。RDD与普通数据结构(如ArrayList)的主要区别有两点:
■ RDD只是一个逻辑概念,在内存中并不会真正地为某个RDD分配存储空间(除非该RDD需要被缓存)。RDD中的数据只会在计算中产生,而且在计算完成后就会消失,而ArrayList等数据结构常驻内存。
■ RDD可以包含多个数据分区,不同数据分区可以由不同的任务(task)在不同节点进行处理。
(3)数据操作:定义了数据模型后,我们可以对RDD 进行各种数据操作,Spark将这些数据操作分为两种:transformation()操作和action()操作。两者的区别是action()操作一般是对数据结果进行后处理,产生输出结果,而且会触发Spark提交job真正执行数据处理任务。transformation()操作和action()操作的使用方式分别为rdd.transformation()和rdd.action(),如rdd2=rdd1.map(func)表示对rdd1进行map()操作得到新的rdd2;还有二元操作,如rdd3=rdd1.join(rdd2)表示对rdd1和rdd2进行join()操作得到rdd3。这里读者可能会问一个问题:为什么操作叫作transformation()?transformation这个词隐含了一个意思是单向操作,也就是rdd1使用transformation()后,会生成新的rdd2,而不能对rdd1本身进行修改。在Spark中,因为数据操作一般是单向操作,通过流水线执行,还需要进行错误容忍等,所以RDD被设计成一个不可变类型,可以类比成一个不能修改其中元素的ArrayList。后续我们会更深入讨论这个问题。一直使用transformation()操作可以不断生成新的RDD,而action()操作用来计算最后的数据结果,如rdd1.count()操作可以统计rdd1中包含的元素个数,rdd1.collect()操作可以将rdd1中的所有元素汇集到Driver节点,并进行进一步处理。
(4)计算结果处理:由于RDD实际上是分布在不同机器上的,所以大数据应用的结果计算分为两种方式:一种方式是直接将计算结果存放到分布式文件系统中,如rdd.save(“hdfs://file_location”),这种方式一般不需要在Driver端进行集中计算;另一种方式是需要在Driver端进行集中计算,如统计RDD中元素数目,需要先使用多个task统计每个RDD中分区(partition)的元素数目,然后将它们汇集到Driver端进行加和计算。例如,在图3.1中,每个分区进行action()操作得到部分计算结果result,然后将这些result发送到Driver端后对其执行f ()函数,得到最终结果。
物理执行计划:
根据action()将逻辑处理流程划分为多个job,根据数据依赖关系(窄依赖、宽依赖)将job划分为多个阶段,根据最后一个RDD的分区数量又将每个阶段划分为多个task。
思考题:
物理执行计划:
spark如何将复杂的数据处理流程拆分为小的执行任务?
spark为何要进行执行阶段的划分?
多维度学习:
spark的各种概念(是什么,为什么会有,解决什么问题)
spark的各种流程和思路(框架)
spark的各种问题