[关闭]
@BrandonLin 2016-07-27T21:51:47.000000Z 字数 19947 阅读 4568

Spark核心概念理解

Spark

本文主要内容来自于《Hadoop权威指南》英文版中的Spark章节,可以说是个人的翻译版本,涵盖了主要的Spark概念。

安装Spark

首先从spark官网下载稳定的二进制分发版本,注意与你安装的Hadoop版本相匹配:

image_1aoh97son249e004ipht6cj016.png-50.2kB

  1. wget http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz

解压:

  1. tar xzf spark-x.y.z-bin-distro.tgz

为了方便运行,将bin目录添加到PATH中:

  1. export SPARK_HOME=/home/spark/
  2. export PATH=$PATH:$SPARK_HOME/bin

完毕。

简单例子

Spark提供了交互式的Spark-shell,这是入门的好起点。spark-shell是基于Scala REPL的交互式工具。启动shell:

  1. spark-shell

image_1aoh9v123shfb1u5gemhg1plg1j.png-67.5kB

从输出中我们可以看到,shell创建了一个Scala变量,存放的是SparkContext的实例。我们使用sc加载一个文本文件:

  1. val lines = sc.textFile("input/ncdc/sample.txt")

image_1aoha8dnf1htrf0h9i1bjd6id2d.png-6.8kB

lines变量引用的是一个RDD对象(Resilient Distributed Dataset)。RDD是Spark最核心的抽象,它是一个(通过分区,partitioned)分布在集群多台机器上的只读对象集合。

在一个典型的Spark应用程序中,一个或者多个RDD被载入作为输入,经过一系列的转化(transformation)之后变成目标RDD集合,然后一个动作(action)作用于这些RDD上,例如计算结果或者保存到持久化介质中。

RDD中的resilient是指:当一个RDD分区(partition)丢失之后,Spark会自动从其原始的RDD重新计算。加载RDD或者在RDD上调用transformation时,并没有触发真正的处理过程,Spark只是创建执行的计划。只有当action作用域RDD时,才会触发真正的数据处理,例如执行foreach().

接着前面载入的数据,拿到lines之后,我们想要把每一行的字段进行切分:

  1. val records = lines.map( _.split("\t"))

image_1aohaajrtd7n1706skb4410122q.png-5.7kB

map方法将一个函数作用在RDD中的每一个元素上,这个例子中,split把每一行(RDD[String])转变成一个Scala的字符串数组(RDD[Array[String]])。

移除脏数据:

  1. val filtered = records.filter( rec => (rec(1) !="9999" && rec(2).match("[01459]")))

image_1aohahj9so8gu2l39q1b061m5437.png-6.9kB

filer针对RDD中的每一个元素执行一个predicate判断,传入的是一个返回Boolean类型的函数,Scala中数组的访问访问是通过()操作。这里主要是过滤到脏数据。

为了找到每一年的最高温度,我们需要执行一个分组操作,Spark提供了reduceByKey的操作,但是只能应用在key-value类型的RDD(使用Scala的Tuple2来表示)上面,因此需要先做一次转换:

  1. val tuples = filtered.map(rec => (rec(0).toInt,rec(1).toInt))

image_1aohajlmn1u94m0qetn1imvdhu3k.png-7kB

通过map操作来完成,将字符串素组转化为Int二元组,scala中调用方法时如果没有参数可以省略不写括号。转换之后我们就可以进行聚合操作:

  1. val maxTemps = tuples.reduceByKey((a,b)=> Math.max(a,b))

image_1aohamd9ru2v1d0q18281na85541.png-12.9kB

reduceByKey接受一个函数,这个函数将一对值合并为一个值,然后不断应用在key对应的所有值上。假设1950这个key对应的记录有:

  1. (1950,20) // 1
  2. (1950,19) // 2
  3. (1950,22) // 3

reduceByKey操作会把max函数应用到1和2身上,即执行max(20,19),得到20,然后再把20跟第三条记录对比,得到最终的22. 我们把结果输出:

  1. maxTemps.foreach(println(_))

foreach是个action操作,针对RDD中的每个元素应用println(_)这个函数,这时候才会触发整个RDD链条执行计算,输出结果到控制台:

  1. (1950,22)
  2. (1949,111)

我们也可以把计算结果保存到磁盘中:

  1. maxTemps.saveAsTextFile("output")

查看输出文件:

  1. cat output/part-*

Spark Applications,JObs,Stages , and Tasks

Spark中也有一些核心的概念。类似于MapReduce,Spark也有作业(job)的概念,但是更为通用一些,作业由任意的stage 有向无环图(DAG)组成,stage有点类似于MapReduce中的map或reduce阶段(phase)。

Spark运行时将Stage进一步被拆分为task,并在分布于集群上的RDD partitions并行运行。作业总是运行于Application的上下文中,这个上下文通过SparkContext来表示,用于组织相关的RDD和共享变量。一个Application可以并行或串行运行多个Job。Application提供了一种在同一个应用中共享数据集的机制,前面运行的作业可以将数据集缓存,后续的作业可以直接访问这些缓存的RDD。这与MapReduce中每个作业都需要从磁盘中读取输入数据是不同的。交互式的Spark会话如Spark-shell就是一个应用实例。

一个Scala应用程序

spark-shell提供了一种探索和学习Spark很好的方式,但是实际中经常需要将业务逻辑作为一个自包含的、完整的应用打包在一起,可以多次运行。下面是一个Scala应用的例子:

  1. import org.apache.spark.SparkContext._
  2. import org.apche.spark.{SparkContext,SparkConf}
  3. object MaxxTemperature{
  4. def main(agrs: Array[String]) {
  5. val conf = new SparkConf().setAppName("Max Temperature")
  6. val sc = new SparkContext(conf)
  7. sc.textFile(agrs[0]))
  8. .map(_.split("\t"))
  9. .filter(rec => (rec(1) != "9999" && rec(2).match("[01459]")))
  10. .map(rec => (rec(0).toInt , rec(1).toInt))
  11. .reduceByKey( (a,b) => Math.max(a,b))
  12. .saveAsTextFile(args(1))
  13. }
  14. }

当作为一个独立的应用时,我们需要自己创建SparkContext,因为没有shell提供这个对象给我们。SparkConf用于配置应用的各个属性,这里我们只设置了应用名称。

Spark中的转换(Transformation)大多数在RDD这个类中定义,但是这个例子中的reduceByKey()事实上是在PairRDDFunctions类中定义的,我们之所以不需要显式转换,是因为有下面的导入语句:

  1. import org.apache.spark.SparkContext._

这个语句导入了Spark中各种隐式转换的函数,这个导入在编写应用的时候很有用。

完成这个简单应用的编写之后,我们使用spark-submit来运行程序:

  1. spark-submit --class MaxTemperature --master local spark-examples.jar input/ncdc/sample.txt output/max-temp

spark-submit类似于Hadoop中的hadoop jar命令,class参数指定了运行Main入口,master指定运行模式,local模式下所有组件都运行在一个JVM中,spark-examples.jar包含编译过的应用程序代码,后面是输入和输出参数。

Java例子

Spark使用基于JVM的语言Scala,能够很好地与Java集成。Spark提供的Java API中,RDD使用类JavaRDD封装,JavaPairRDD用于特殊的key-value RDD。这两个类都事先了JavaRDDLike接口,RDD的大部分操作方法都在这个接口中定义。上述的逻辑使用Java来表达如下:

  1. public class MaxTemperatureSpark {
  2. public static void main(String[] args) throws Exception {
  3. if (ages.length != 2){
  4. System.err.println("Usage: MaxTemperaturSpark <input path> <output path>");
  5. System.exit(-1);
  6. }
  7. SparkConf conf = new SparkConf();
  8. JavaSparkContext sc = new JavaSparkContext("local","Max TemperaturSpark",conf);
  9. JavaRDD<String> lines = sc.textFile(args[0]);
  10. JavaRDD<String[]> records = lines.map( new Function<String , String[]>(){
  11. @Override
  12. public String[] call(String s){
  13. return s.split("\t");
  14. }
  15. });
  16. JavaRDD<String[]> filtered = records.filter(new Function<String[],Boolean>(){
  17. @Override
  18. public Boolean call(String[] rec){
  19. return rec[1] != "9999" && rec[2].matches("[01459]");
  20. }
  21. });
  22. JavaPairRDD<Integer,Integer> tuples = filtered.mapToPair(
  23. new PairFunction<String[],Integer,Integer>(){
  24. @Override
  25. public Tuple2<Integer,Integer> call(String[] rec) {
  26. return new Tuple2<Integer,Integer> ( Integer.parseInt(rec[0]),Integer.parseInd(rec[1]));
  27. }
  28. } );
  29. JavaPairRDD<Integer,Integer> maxTemps = tuples.reduceByKey(
  30. new Function2<Integer,Integer,Integer>(){
  31. @Override
  32. public Integer call (Integer i1 , Integer i2){
  33. return Math.max(i1,i2);
  34. }
  35. }
  36. );
  37. maxTemps.saveAsTextFile(agrs[1]);
  38. }
  39. }

可以看到代码非常冗长,Java在处理函数式的代码的确很不给力。实现逻辑很简单,就是不断对RDD做转换,转换的逻辑大多通过各阶的函数来表示,例如Function,Function2,PairFunction等。另外静态类型的特点也要求我们在定义每个RDD都要指定其泛型类型。没有了Scala中的隐式转换,因此从String数组RDD到PairRDD也需要我们自己动手。

编译这个类,然后使用spark-submit提交,格式与Scala版本完全一样,除了类名变了。

Python例子

Spark也提供了Python的API,一般叫PySpark。通过使用Python中的lambda表达式,Python写出来的代码和Scala很类似,比较紧凑。

  1. from pyspark import SparkContext
  2. import re ,sys
  3. sc = SparkContext("local","Max Temperature")
  4. sc.textFile(sys.argv[1])
  5. .map( lambda s: s.split("\t"))
  6. .filter(lambda rec : (rec[1]!="9999" and re.match("[01459]" ,rec[2])))
  7. .map(lambda rec: (int(rec[0]),int(rec[1])))
  8. .reduceByKey(max)
  9. .saveAsTextFile(sys.argv[2])

很好,代码非常紧凑,参数通过sys.argv传入,下标从1开始而不是0. 转换逻辑通过lambda来表达,正则匹配用了re模块,reduce的时候用了python内置的max函数。

Python代码运行时,Spark会fork出子线程来运行这些用户代码。在启动程序(launcher program)和executor中都会fork子进程。两个进程之间使用socket通信,所有父进程可以把RDD Partition的数据传递给Python代码。

运行Python版本时,我们制定的是Python文件而不是jar包:

  1. spark-submit --master local \
  2. src/python/MaxTemperature.py \
  3. input/ncdn/sample.txt output

另外,使用pyspark命令也可以启动Python版的交互式REPL。

RDD

RDD是Spark应用中最核心的部分,接下来看一些RDD相关的内容。

创建RDD

有3种可以创建RDD:
1. 从内存中的对象集合创建(parallelism collection)
2. 从外部存储文件创建(如HDFS)
3. 从其他已存在的RDD转换而来

第1种方式适合内存不敏感的少量数据进行操作。例如下面代码对1-10的整数进行处理:

  1. val params = sc.parallelize( 1 to 10)
  2. val results = params.map(doSomethingExpensive)

doSomethingExpensive函数在params上的每个元素并行计算,并行度由spark.default.parallelism参数指定,默认情况下,如果在local模式运行,该值为机器的CPU核数。如果运行在集群环境中,该值为集群中属于该应用的executor拥有的CPU总核数。
如果我们不想使用默认的并行度,可以在parallelize方法传入第二个参数指定,下面的代码指定并行度为10:

  1. val params = sc.parallelize(1 to 10 , 10)

第2种方法通过创建对外部数据的引用来创建RDD:

  1. val text:RDD[String] = sc.textFile("hdfs-path")

路径参数必须是Hadoop文件系统的的文件路径,例如本地文件路径,HDFS文件路径,或者HDFS暴露的web接口webhdfs。在内部,Spark使用MapReduce的TextInputFormat API读取文件,因此文件的分区和MapReduce是一样的,一个Block对应一个Partition。我们也可以明确指定想要的Partition数:

  1. val text:RDD[String] = sc.textFile("hadoop-fs-path" , 10 )

另外,我们可以把整个文件当做一条记录来处理,此时创建的RDD是一个PairRDD,key为文件路径,value为文件的内容:

  1. val file:RDD[(String,String)] = sc.wholeTextFiles("hadoop-fs-path")

除了读取文本文件外,Spark也可以读取其他格式的文件,例如SequenceFile:

  1. sc.sequenceFile[IntWritable,Text](input-path)

对于Writable数据类型,Spark会自动将其转成对应的Java类型,因此下面这个语句与上边的等同:

  1. sc.sequenceFile[Int,String](input-path)

对于任意类型的Hadoop InputFormat,有2中方式可以创建RDD:

  1. val job = new Job()
  2. AvroJob.setInputKeySchema(job,WeatherRecord.getClassSchema)
  3. val data = sc.newAPIHadoopFile(input-path ,
  4. classOf[AvroKeyInputFormat[WeatherRecord]],
  5. classOf[AvroKey[WeatherRecord]],
  6. classOf[NullWritable],
  7. job.getCofiguration)

第二行中我们把Schema加入做Job的配置中,方法四个参数分别为:

第3中创建RDD的方式是从已有的RDD转换而来,下一部分的Transformation具体介绍这种方式。

转化:Transformation和Action

Spark定义了2种作用于RDD的的操作:Transformation和Action。Transformation从一个已有的RDD,计算生成另一个RDD。Action作用在一个RDD上,计算后对结果进行操作,返回给用户或者保存到外部存储中。

调用Action会立即生效,但是Transformation是懒运行的(lazy),所有转换操作只有当Action触发之后才会执行。

  1. val text = sc.textFile(input-path)
  2. val lower: RDD[String] = text.map(_.toLowerCase
  3. lower.foreach(println(_))

上述代码中,map是一个操作,foreach是一个Action,只有当调用foreach时,Spark才会运行一个作业,从文件中读取数据创建RDD,针对每一个元素调用toLowerCase方法,输出控制台。

区分一个操作是Transformation还是Action,一般可以通过返回类型来判断。如果返回的是RDD,则为Transformation,否则为Action。关于RDD操作的描述,大部分可以在org.apache.spark.rdd包下的RDD类找到,特定类型的键值对RDD,可以在PairRDDFunctions中找到。

Spark的库中带了很丰富的操作,包括mapping,grouping,aggregating,repartitioning,sampling,joining等转换操作,Action操作包括从RDD中提取固定个数的元素,抽样,保存到外部等。更详细的描述可以在Spark文档中找到。

作用在键值对RDD上的聚合操作主要有3个:

这三个操作都是作用在每一个key上,对key上的值列表进行聚合操作,得到一个值。对应的reduce,fold,aggregate操作类似,但是是作用在整个RDD上,最终生成单一值。下面以一个例子来说明这三个Transformation:

  1. val pairs:RDD[(String,Int)]=
  2. sc.parallelize(Array(("a",3),("a",1),("b",7),("a",5)))
  3. val sums: RDD[(String,Int)] = pairs.reduceByKey_+_
  4. assert(sums.collect().toSet === Set(("a",9),("b",7)))

针对每一个key,reduceByKey操作将加法函数_+_循环作用在所有值。例如,对于a这个key,其值有:3,1,5, 执行加法(3+1)+5=9,每一次计算将上一次计算结果与下一个值进行运算。因为这些操作通常都是在集群中并行执行,所以聚合函数必须是commutative和associative的,也就是计算结果跟的顺序无关,我们的例子中(3+1)+5与(5+3)+1结果是一样的。

foldByKey的转换操作如下:

  1. val sums :RDD[(String,Int)] = pairs.foldByKey(0)(_+_)
  2. assert( sums.collect().toSet === Set(("a",9),"b",7)))

不同于reduceByKey,foldByKey需要传递一个初始的"零值",不同类型的零值可能不同,此时聚合变成 (((0+3)+1)+5)=9,初始值第一个参与运算,其他值顺序无关,b相应的聚合为0+7=7。

reduceByKey和foldByKey都无法修改聚合结果的类型,即整数相加之后得到的依然数整数,不能修改类型。要想修改聚合结果的类型,需要使用aggregateByKey:

  1. val sums :RDD[(String , HashSet[Int])]=
  2. pairs.aggregateByKey(new HashSet[Int])( _+=_ ,_++=_)
  3. assert( sums.collect().toSet ===
  4. Set( ("a",Set(1,3,5)),
  5. ("b",Set(7))
  6. )
  7. )

我们提供了一个初始的值,即空的Set[Int],另外我们提供了两个函数(_+=_,_++=_).第一个函数控制值是如何合并到Set中的,_+=_a=a+b的简写,对于Set而言,就是把值加入到原有的Set中,返回一个新的Set,原有Set保持不变。
第2个函数控制的是两个Set是如何合并在一起的,这个在reduce中合并来自己多个Partition的聚合结果时使用。这里的函数_++=_表示将第二个Set中的元素都加到第一个Set中。

转换之后的RDD可以在内存中持久化,以便后续的操作可以更快速地访问。

持久化:Persistence

前面我们提到将RDD转化成键值对的RDD,这个开销相对比较大,我们将转化的结果缓存下来:

  1. tuples.cache()

调用cache方法并不会立刻将结果缓存,而是设置一个标志位告诉Spark:当运行这个作业的时候,将结果缓存下来。为了让这个缓存真正存在内存中,我们触发一个Action:

  1. tuples.reduceByKey((a,b)=>Marh.max(a,b)).foreach(println(_))

返回结果中的BlockManagerInfo显示RDD的编号(RDD number)为4,有2个Paritition。同一个应用中后续作业如果需要用到这个RDD,将直接从缓存加载。我们运行一个找最小值的的转换:

  1. tuples.reduceByKey((a,b) => Math.min(a,b)).foreach(println(_))

从输出信息可以看到RDD是直接从缓存加载的(Found Block locally),当数据量比较大事,能够节约的时间非常可观。相比于MapReduce,不同的作业之间如果想连接起来,只能通过写入文件,后续作业再从磁盘中读取。Spark可以将数据集缓冲在集群的内存中,后续作业可以快速读取数据。

Spark RDD的这种特性使得其在交互性的应用中非常实用,对于需要多次迭代的算法也很适合在Spark上运行,迭代过程中的产生的数据可以缓存在内存中,供下一轮迭代使用。迭代算法可以通过MapReduce实现,但是每一轮迭代的结果需要写入磁盘,下一轮再从磁盘读取,效率低下。

需要注意的是,缓存的RDD只能被同一个应用的作业读取,如果需要跨应用使用这些RDD,需要使用相应的saveAs*以及hadoopFile,hadoopRDD保存数据到外部存储,然后再写入。当一个应用运行结束后,其所有缓存的RDD将无法访问,除非保存到外部存储中。

持久化级别

调用cache方法,可以将RDD的各个partition保存到其executor的内存中,如果内存容纳不下,作业不会失败,而是在必要的时候重新计算。对于拥有很多Transformation的大型作业,重新计算是非常昂贵的。因此Spark提供了不同级别的持久化机制,在调用persist方法时传入StorageLevel参数,控制持久化类型和级别。

默认情况下,持久化级别为MEMORY_ONLY,这个级别的缓存数据,在内存中以对象的形式存在。可以通过将对象序列化成字节数组,达到更紧凑的数据格式,节约内存空间,这个级别是MEMORY_ONLY_SER。相比MEMORY_ONLY,这种序列化的方式更耗CPU,但是当未序列化的数据存不下内存而序列化之后可以存放于内存时,这种牺牲是值得的,用CPU换内存,这是常见的一种权衡。MEMORY_ONLY_SER的持久化级别同时降低了GC压力,因为RDD是以一个字节数组存在于内存中的,而不是很多对象。

如何知道内存能否容纳下RDD,可以通过查看BlockManager的日志文件来获取这一信息。另外,每一个驱动程序的SparkContext都在4040端口上运行了一个HTTP Server,这个服务器提供了很多实用的信息,例如关于SparkContext运行环境,其中运行的作业,还有缓存的RDD Paritition信息。

image_1aohc1fgnbo8srqjea1i4g1qdj4e.png-173.5kB

默认情况下,RDD Partition实用Java的序列化方式来序列化数据,但通常情况下Kyro是一个更好的选择,无论实在速度上还是空间效率上。另外可以通过压缩来进一步节约内存空间,但是这是以CPU计算为代价的。要使用压缩格式,将spark.rdd.compress属性设为true,并根据需要设置压缩算法spark.io.compression.codec.

如果重新计算RDD的代价非常昂贵,那么值得考虑另外的2中持久化级别:MEMORY_AND_DISK和MEMORY_AND_DISK_SER。前者会在内存不足时将数据写入到磁盘,后者在序列化后的数据仍然无法存放在内存时,将数据写到磁盘。

另外还有一些更高级的持久化特性,例如将RDD Partition的多个副本存在集群的不同节点上,或者使用对外(off-heap)内存,更详细内容参考Spark文档。

序列化

Spark中的序列化通常考虑2个方面:

数据序列化

默认情况下,Spark使用Java的序列化机制序列化数据,并通过网络发送给其他Executor,持久化(或者缓存)RDD Partition的时候也会涉及到数据序列化。实现了Serializable或Externalizable接口的对象使用Java标准的方式序列化之后,很容易被其他JVM应用理解。但是在性能和空间效率上不是很理想。

对于大部分的Spark程序来说,Kyro 序列化是更好的选择方式。Kyro是一种各高效的通用Java序列化库。要使用Kyro序列化,需要在SparkConf上配置如下序列化器:

  1. conf.set("spark.serializer","org.apache.spark.serializer.KyroSerializer")

Kyro不要求实现任何特定的接口(像java.io.serializable那样),侵入性很小,所以POJO可以直接被序列化,除了启用Kyro序列化以外,无需任何其他操作。但是,如果在使用一个类之前先在Kyro注册,可以使得性能更加高效。如果没有注册,Kyro在序列化时会写入一个对象所属类的引用,每个被序列化的对象都会写入一个类的全称。如果事先注册,则只写入一个整数ID。Spark已经默认注册了Scala的类和许多其他框架的类,例如Avro Generic,Thrift的类。

在Kyro中注册类很直接,创建一个KyroRegistrator的子类,实现registerClasses方法:

  1. class CustomKyroRegistrator extends KyroRegistrator {
  2. override def registerClassed(kyro : Kyro) {
  3. kyro.register(classOf[WeatherRecord])
  4. }
  5. }

然后在驱动程序中,将Registrator的类名全称赋给spark.kyro.registrator属性:

  1. conf.set("spark.kyro.registrator","CustomKyroRegistrator")

函数序列化

在Scala中,函数的序列化采用的是Java标准的序列化机制。Spark也使用这种标准的方式发送函数给远程的Executor节点。事实上,即使运行在local模式(即所有Spark组价都在一个JVM中运行),Spark也会序列化函数。如果在代码中使用了不可序列化的函数,Spark将会报错,例如从不可序列化类的一个方法中转换而来的方法是不可序列化的。

共享变量

Spark程序经常需要访问不在RDD中的数据,例如下面的代码在map方法中使用了一个查询表:

  1. val lookup = Map( 1 -> "a" , 2 ->"e" , 3->"i" , 4->"0",5->"u")
  2. val result = sc.parallelize(Array(2,1,3)).map(lookup(_))
  3. assert( result.collect.toSet === Set("a" , "e","i"))

这种方式没有问题,lookup变量被序列化之后作为一个闭包(closure)传递给map方法。但是使用广播变量(broadcast variables),可以更高效地达到同样的效果

广播变量:Broadcast variables

广播变量会被序列化,然后发送给每一个executor,executor将广播变量缓存,以便后续需要的时候使用,这不同于被序列化为闭包一部分的常规变量,常规变量会作为闭包的一部分通过网络发送给每个task。广播变量有点类似MapReduce中的分布式缓存,虽然Spark的实现中将数据存储在内存,只有在内存耗尽时才会写到磁盘(而MapReduce的分布式缓存位于磁盘中)。

使用SparkContext的broadcast方法来广播变量,该方法返回一个Broadcast[T]类型的包装器:

  1. val lookup:Broadcast[Map[Int,String]] =
  2. sc.broadcast(Map(1->"a",2->"e",3->"i",4->"o", 5 ->"u"))
  3. val result = sc.parallelize(Array(2,1,3)).map(lookup.value(_))

广播变量的值通过value来访问。注意的是,广播变量是单向传播的,从驱动到任务,没有办法更新广播变量或者将更新传播回驱动程序。这种情况下,可以通过Accumulator来实现。

Accumulators

累积器是一个共享变量,任务只能做增加操作,就像MapReduce中的counter。当一个作业运行完毕后,累积器的最终值可以从驱动程序中获取。下面这个例子使用累积器来统计整形的RDD中有多少个元素,同时使用reduce Action来计算总和:

  1. val count:Accumulator[Int] = sc.accumulator(0)
  2. val result = sc.parallelize(Array(1,2,3))
  3. .map( i => {count += 1; i})
  4. .reduce((x,y) => x + y)
  5. assert(count.value === 3)
  6. assert(result === 6)

累计变量count通过SparkContext的accumulator方法创建。其中的map操作是个identify function,原值返回,但是产生了count加1的副作用。当作业运行完毕之后,累积器的值通过value访问得到。

上面的例子中,我们使用Int类型的累积器,但是任何数值类型的数据类型都可以用于累积器。Spark另外提供了accumulable方法,用于累积器的结果类型和被累积的类型不一致的情况,accumulableCollection用于累积可变集合的值。更多内容参考Spark文档。

Spark作业运行过程

Spark作业的运行从宏观上看,只要由driver和executors组成。driver运行application(SparkContext)、调度作业(schedule tasks)。executor负责具体任务的执行。通常情况下,driver运行在client机器上(client一般不受整个集群管理),但是在YARN的cluster模式上,driver运行在Application Master上。

下图是Spark运行作业的整体流程:

2016-07-25_021351.png-481.8kB

当有Action在RDD上执行时,作业被自动提交,提交将调用SparkContext的run_job被调用,进而把作业提交给Scheduler。Scheduler运行在driver上,由两部分组成:

构造DAG

在介绍作业如何被分解为DAG之前,我们需要了解一下stage可以执行的任务类型,stage可以运行2中类型的任务:shuffle map task和result task。

最简单的作业可以只有Result Task,也就是只有一个由Result Task组成的stage。对于复杂的应用,可能需要组合多个shuffle stage。例如,下面的例子中我们想要计算词语的频率分布直方图:

  1. val hist:Map[Int,Long] = sc.textFile(inputPath)
  2. .map(word => (word.toLowerCase(),1))
  3. .reduceByKey((a,b)=>a+b)
  4. .map(_.swap)
  5. .countByKey()

前两个Transformation统计词频,即计算每个词语出现的次数。第三个Transformation对调key和value,得到的是(count,word)。最后的Action countByKey()得到频率直方图,即出现N词的词语有M个。

Spark的DAG Scheduler将这个作业分解为2个Stage,因为reduceByKey()这个Operation需要通过shuffle stage来完成。最后的DAG如下图所示:

2016-07-25_025637.png-976.6kB

在一个stage内,RDD通过也被组织成DAG。上图中展示了RDD的类型以及产生该RDD的操作。RDD[String]有sc.textFile()创建。图中省略了一些Spark内部产生的RDD,例如textFile()创建的RDD事实上是MappedRDD[String],其父类为HadoopRDD[LongWritable,Text]。

注意到reduceByKey同时出现在两个Stage中,这是因为它是使用shuffle实现的,在map端(stage1)reduce函数作为combiner运行,在reduce端(stage2)作为reducer运行。这一点类似于MapReduce,有些情况下降Reducer的实现直接作为map端的Combinor,对map任务的输出结果先做一次预聚合,可以避免在网络上传输大量数据。

Spark的shuffle实现把输出写入到本地的分区文件(partitioned file),即使是内存级别的RDD,这些文件被下一个stage的RDD取走。

如果RDD已经被上一个job(同一个Application)持久化,则DAG Scheduler不会再创建stage重新计算这个RDD(或者由这个RDD衍生的其他RDD)。

DAG Scheduler还负责将stage分级为task,然后将task提交给Task Scheduler。在这个例子中,输入文件的每一个partition,运行一个task(shuffle map),reduceByKey()的并行度(parallelism)可以通过其第二个参数设置,如果没有设置,则从其上一级RDD推断,这个例子中就是输入数据的partition数量。

针对每一个Task,DAG Scheduler都给出了一个位置偏好(placement preference),Task Scheduler可以根据这些偏好,更有效地理由本地数据的优势(data locality)。例如,如果task处理的是来自HDFS的RDD,则更倾向于运行在拥有对应数据的节点上(node local)。而如果一个task处理的数据来自于缓存在内存中的RDD partition,则倾向于运行在内存中拥有这些数据的executor(process local)。

一旦DAG Scheduler构建完stage的DAG后,将每个stage的任务提交给task scheduler。子stage只有在其上一级完成之后才提交。

任务调度

当task scheduler收到一组任务后,它根据应用持有的executor列表(在YARN中,Spark会实现申请固定数量的容器,然后自己决定如何使用这些Container,这一点不同于批处理的MapReduce,MapReduce按需申请Container),结合任务的位置偏好,决定每个任务运行在哪个executor,即task-executor映射。然后把task分配给空闲的executor,直到任务集都运行完毕。默认情况下,每个task分配一核CPU,CPU数量可以通过spark.task.cpus设置。

对于一个给定的executor,作业调度器优先分配有process-local偏好的task,然后依次是node-local,rack-local。最后才是没有位置偏好(nonlocal的任务或者推测执行(speculative)的任务。

已分配的任务通过scheduler backend启动,这个backend发送启动任务的消息给对应的executor backend,告知executor开始运行任务。Spark使用Akka发送远程消息,而不是使用Hadoop的RPC机制,Akka用于构建高并发和分布式的JVM应用,提供了工具箱和运行时。

Executor在任务结束或者失败的时候发送状态消息给driver,如果任务执行失败,task scheduler将重新提交任务到另一个executor。如果启用了推测执行(默认没有启用),对于运行慢的任务,会启用speculative任务。

任务执行

Executor接到运行任务的消息后,首先确认任务需要的jar包和文件都是最新的。如果之前的任务运行过,executor会在本地缓存这些jar包和文件,只有当发生变更时才会重新下载。接着反序列化任务代码,任务代码以序列化字节的形式通过任务启动消息发送到executor。最后任务在executor同一个JVM中被执行,因此无需再任务启动的时候重新启动JVM。

任务执行的结果序列化之后发送给executor backend,然后作为状态更新消息(status update message)发回给driver。如果是shuffle map task,返回中包含的信息用于下一个stage提取数据,类似于MapReduce中map任务完成之后,通过心跳发送消息给Application Master,然后Reducer通过心跳得知map已经运行完毕,进而去copy数据。如果是Result task,将对应partition的运行结果发送给driver,driver组合出最终结果。

Executor和Cluster Manager

上一部分我们了解了Spark是如何依靠executor来运行任务的。接下来进一步了解executor究竟是如何启动的。Spark中,executor的声明周期通过cluster manager来管理,Spark提供了多种不同的manager:

image_1aohc7qualq0d25qt71uh2al74r.png-41kB

Mesos和YARN的资源管理方式优于Standalone,它们考虑了集群中其他应用对资源的需求(例如MapReduce作业),而Standalone采用静态的资源分配方式,没有办法动态地调整以满足集群的其他资源需求。YARN是唯一一个与Hadoop的Kerberos安全机制集成的cluster manager。

Spark on YARN

Spark可以通过两种模式运行在YARN上:

YARN client mode

在YARN client模式上,在driver构造出SparkContext实例(下图step1)的时候,就开始于YARN进行交互。Context向YARN的资源管理器(RM)提交一个应用(step2),RM在集群的NodeManager中启动一个Container,并在Container中运行Spark ExecutorLauncher(step3)。ExecutorLauncher的任务是向RM申请资源(step4),并在申请到资源后将Executor Backend作为容器在相应的NodeManager中启动(step5)

2016-07-25_042302.png-471.4kB

当executor启动后,回头连接到SparkContext并注册自己,注册的这个步骤可以给SparkContext提供整个应用的executor信息,一遍task scheduler在决定将任务运行在哪个节点(task placement decision)的时候,可以考虑任务的位置偏好。

executor的数量在启动spark-shell,spark-submit或者pyspark时指定,如果没有指定,默认启动2个executor。每个executor使用的CPU核数(默认1)和内存大小(默认1024M)也可以在这个时候设置,下面这个例子启动spark-shell,运行4个executor:

  1. spark-shell --master yarn-client --num-executors 4 --executor-cores 1 -- executor-memory 2g

image_1aohci69f1bsomt38ipcbkab9.png-56.4kB

不同于Standalone或者Mosos,YARN的NM地址并没有在这里配置,而是从Hadoop的配置中提取,配置的目录通过HADOOP_CONF_DIR环境变量设置。

YARN cluster mode

在cluster模式下,用户的驱动程序(driver,下图的Spark Program)运行在YARN的Application Master进程中,使用该模式时,指定master的url类似:

  1. spark-submit --master yarn-cluster ...

其他参数,例如executor数量,jar包或者python文件,与client mode一样。如下图所示,spark-submit客户端将启动一个YARN Application(step1通过请求NM),但是它不运行任何用户代码。Application Master在开始为executor申请资源(step4)之前将启动驱动程序(step3b),其他的过程与client mode一样。

注意此时我们如果再访问刚才的4040端口,发现页面自动跳转到YARN的应用程序管理界面了,url类似:

  1. master:18088/proxy/application_1469461440579_0001

QQ截图20160725044008.jpg-312kB

YARN的两种模式下,启动executor之前对于数据放在哪里(data locality)并不知道,所以启动的executor可能与所要处理的数据不在一个节点上(从而task的位置偏好无法得到满足)。对于交互式的session来说,这个是比较容易接受的,因为在启动一个交互会话时,很可能并不知道要处理哪些数据。但是在生产环境中就不是这样了,所以Spark提供一种方式,当应用运行在cluster模式时,可以通过位置提示(placement hint)来提高data locality,这是通过构造SparkContext时传入位置偏好达到的。

构造SparkContext之前,我们可以使用InputFormatInfo这个工具类来获取位置偏好,例如对于文本文件,使用TextInputFormat,如下可以获取位置偏好:

  1. val preferredLocations = InputFormatInfo.computePreferredLocations(
  2. Seq(new InputFormatInfo(new Configuration(),classof[TextInputFormat],inputPath)))
  3. val sc = new SparkContext(conf , preferredLocations)

这些位置偏好信息在Application Master为executor申请资源的时候可以使用,目前该特性的API还不是很稳定。

最后我们提交一个Spark自带的例子到YARN集群运行:

  1. export HADOOP_CONF_DIR=/home/hadoop-2.6.0/etc/hadoop/
  2. spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --executor-memory 2G --num-executors 6 /home/spark-1.6.0/lib/spark-examples-1.6.0-hadoop2.6.0.jar 1000

运行结果:

image_1aohdq93usasamd1ltj1sot1phq1g.png-128.1kB

运行成功后在YARN管理界面可以看到这个Spark作业:

image_1aohd8fap1sua1vo5ut4118fq3m13.png-63.4kB

参考资料

大部分内容来自《Hadoop权威指南》第4版

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注