@BrandonLin
2016-07-27T21:51:47.000000Z
字数 19947
阅读 4568
Spark
本文主要内容来自于《Hadoop权威指南》英文版中的Spark章节,可以说是个人的翻译版本,涵盖了主要的Spark概念。
首先从spark官网下载稳定的二进制分发版本,注意与你安装的Hadoop版本相匹配:
wget http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
解压:
tar xzf spark-x.y.z-bin-distro.tgz
为了方便运行,将bin目录添加到PATH中:
export SPARK_HOME=/home/spark/
export PATH=$PATH:$SPARK_HOME/bin
完毕。
Spark提供了交互式的Spark-shell,这是入门的好起点。spark-shell是基于Scala REPL的交互式工具。启动shell:
spark-shell
从输出中我们可以看到,shell创建了一个Scala变量,存放的是SparkContext的实例。我们使用sc加载一个文本文件:
val lines = sc.textFile("input/ncdc/sample.txt")
lines变量引用的是一个RDD对象(Resilient Distributed Dataset)。RDD是Spark最核心的抽象,它是一个(通过分区,partitioned)分布在集群多台机器上的只读对象集合。
在一个典型的Spark应用程序中,一个或者多个RDD被载入作为输入,经过一系列的转化(transformation)之后变成目标RDD集合,然后一个动作(action)作用于这些RDD上,例如计算结果或者保存到持久化介质中。
RDD中的resilient是指:当一个RDD分区(partition)丢失之后,Spark会自动从其原始的RDD重新计算。加载RDD或者在RDD上调用transformation时,并没有触发真正的处理过程,Spark只是创建执行的计划。只有当action作用域RDD时,才会触发真正的数据处理,例如执行foreach().
接着前面载入的数据,拿到lines之后,我们想要把每一行的字段进行切分:
val records = lines.map( _.split("\t"))
map方法将一个函数作用在RDD中的每一个元素上,这个例子中,split把每一行(RDD[String]
)转变成一个Scala的字符串数组(RDD[Array[String]]
)。
移除脏数据:
val filtered = records.filter( rec => (rec(1) !="9999" && rec(2).match("[01459]")))
filer针对RDD中的每一个元素执行一个predicate判断,传入的是一个返回Boolean类型的函数,Scala中数组的访问访问是通过()操作。这里主要是过滤到脏数据。
为了找到每一年的最高温度,我们需要执行一个分组操作,Spark提供了reduceByKey的操作,但是只能应用在key-value类型的RDD(使用Scala的Tuple2来表示)上面,因此需要先做一次转换:
val tuples = filtered.map(rec => (rec(0).toInt,rec(1).toInt))
通过map操作来完成,将字符串素组转化为Int二元组,scala中调用方法时如果没有参数可以省略不写括号。转换之后我们就可以进行聚合操作:
val maxTemps = tuples.reduceByKey((a,b)=> Math.max(a,b))
reduceByKey接受一个函数,这个函数将一对值合并为一个值,然后不断应用在key对应的所有值上。假设1950这个key对应的记录有:
(1950,20) // 1
(1950,19) // 2
(1950,22) // 3
reduceByKey操作会把max函数应用到1和2身上,即执行max(20,19),得到20,然后再把20跟第三条记录对比,得到最终的22. 我们把结果输出:
maxTemps.foreach(println(_))
foreach是个action操作,针对RDD中的每个元素应用println(_)这个函数,这时候才会触发整个RDD链条执行计算,输出结果到控制台:
(1950,22)
(1949,111)
我们也可以把计算结果保存到磁盘中:
maxTemps.saveAsTextFile("output")
查看输出文件:
cat output/part-*
Spark中也有一些核心的概念。类似于MapReduce,Spark也有作业(job)的概念,但是更为通用一些,作业由任意的stage 有向无环图(DAG)组成,stage有点类似于MapReduce中的map或reduce阶段(phase)。
Spark运行时将Stage进一步被拆分为task,并在分布于集群上的RDD partitions并行运行。作业总是运行于Application的上下文中,这个上下文通过SparkContext来表示,用于组织相关的RDD和共享变量。一个Application可以并行或串行运行多个Job。Application提供了一种在同一个应用中共享数据集的机制,前面运行的作业可以将数据集缓存,后续的作业可以直接访问这些缓存的RDD。这与MapReduce中每个作业都需要从磁盘中读取输入数据是不同的。交互式的Spark会话如Spark-shell就是一个应用实例。
spark-shell提供了一种探索和学习Spark很好的方式,但是实际中经常需要将业务逻辑作为一个自包含的、完整的应用打包在一起,可以多次运行。下面是一个Scala应用的例子:
import org.apache.spark.SparkContext._
import org.apche.spark.{SparkContext,SparkConf}
object MaxxTemperature{
def main(agrs: Array[String]) {
val conf = new SparkConf().setAppName("Max Temperature")
val sc = new SparkContext(conf)
sc.textFile(agrs[0]))
.map(_.split("\t"))
.filter(rec => (rec(1) != "9999" && rec(2).match("[01459]")))
.map(rec => (rec(0).toInt , rec(1).toInt))
.reduceByKey( (a,b) => Math.max(a,b))
.saveAsTextFile(args(1))
}
}
当作为一个独立的应用时,我们需要自己创建SparkContext,因为没有shell提供这个对象给我们。SparkConf用于配置应用的各个属性,这里我们只设置了应用名称。
Spark中的转换(Transformation)大多数在RDD这个类中定义,但是这个例子中的reduceByKey()事实上是在PairRDDFunctions类中定义的,我们之所以不需要显式转换,是因为有下面的导入语句:
import org.apache.spark.SparkContext._
这个语句导入了Spark中各种隐式转换的函数,这个导入在编写应用的时候很有用。
完成这个简单应用的编写之后,我们使用spark-submit来运行程序:
spark-submit --class MaxTemperature --master local spark-examples.jar input/ncdc/sample.txt output/max-temp
spark-submit类似于Hadoop中的hadoop jar
命令,class参数指定了运行Main入口,master指定运行模式,local模式下所有组件都运行在一个JVM中,spark-examples.jar包含编译过的应用程序代码,后面是输入和输出参数。
Spark使用基于JVM的语言Scala,能够很好地与Java集成。Spark提供的Java API中,RDD使用类JavaRDD封装,JavaPairRDD用于特殊的key-value RDD。这两个类都事先了JavaRDDLike接口,RDD的大部分操作方法都在这个接口中定义。上述的逻辑使用Java来表达如下:
public class MaxTemperatureSpark {
public static void main(String[] args) throws Exception {
if (ages.length != 2){
System.err.println("Usage: MaxTemperaturSpark <input path> <output path>");
System.exit(-1);
}
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext("local","Max TemperaturSpark",conf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<String[]> records = lines.map( new Function<String , String[]>(){
@Override
public String[] call(String s){
return s.split("\t");
}
});
JavaRDD<String[]> filtered = records.filter(new Function<String[],Boolean>(){
@Override
public Boolean call(String[] rec){
return rec[1] != "9999" && rec[2].matches("[01459]");
}
});
JavaPairRDD<Integer,Integer> tuples = filtered.mapToPair(
new PairFunction<String[],Integer,Integer>(){
@Override
public Tuple2<Integer,Integer> call(String[] rec) {
return new Tuple2<Integer,Integer> ( Integer.parseInt(rec[0]),Integer.parseInd(rec[1]));
}
} );
JavaPairRDD<Integer,Integer> maxTemps = tuples.reduceByKey(
new Function2<Integer,Integer,Integer>(){
@Override
public Integer call (Integer i1 , Integer i2){
return Math.max(i1,i2);
}
}
);
maxTemps.saveAsTextFile(agrs[1]);
}
}
可以看到代码非常冗长,Java在处理函数式的代码的确很不给力。实现逻辑很简单,就是不断对RDD做转换,转换的逻辑大多通过各阶的函数来表示,例如Function,Function2,PairFunction等。另外静态类型的特点也要求我们在定义每个RDD都要指定其泛型类型。没有了Scala中的隐式转换,因此从String数组RDD到PairRDD也需要我们自己动手。
编译这个类,然后使用spark-submit提交,格式与Scala版本完全一样,除了类名变了。
Spark也提供了Python的API,一般叫PySpark。通过使用Python中的lambda表达式,Python写出来的代码和Scala很类似,比较紧凑。
from pyspark import SparkContext
import re ,sys
sc = SparkContext("local","Max Temperature")
sc.textFile(sys.argv[1])
.map( lambda s: s.split("\t"))
.filter(lambda rec : (rec[1]!="9999" and re.match("[01459]" ,rec[2])))
.map(lambda rec: (int(rec[0]),int(rec[1])))
.reduceByKey(max)
.saveAsTextFile(sys.argv[2])
很好,代码非常紧凑,参数通过sys.argv传入,下标从1开始而不是0. 转换逻辑通过lambda来表达,正则匹配用了re模块,reduce的时候用了python内置的max函数。
Python代码运行时,Spark会fork出子线程来运行这些用户代码。在启动程序(launcher program)和executor中都会fork子进程。两个进程之间使用socket通信,所有父进程可以把RDD Partition的数据传递给Python代码。
运行Python版本时,我们制定的是Python文件而不是jar包:
spark-submit --master local \
src/python/MaxTemperature.py \
input/ncdn/sample.txt output
另外,使用pyspark命令也可以启动Python版的交互式REPL。
RDD是Spark应用中最核心的部分,接下来看一些RDD相关的内容。
有3种可以创建RDD:
1. 从内存中的对象集合创建(parallelism collection)
2. 从外部存储文件创建(如HDFS)
3. 从其他已存在的RDD转换而来
第1种方式适合内存不敏感的少量数据进行操作。例如下面代码对1-10的整数进行处理:
val params = sc.parallelize( 1 to 10)
val results = params.map(doSomethingExpensive)
doSomethingExpensive函数在params上的每个元素并行计算,并行度由spark.default.parallelism参数指定,默认情况下,如果在local模式运行,该值为机器的CPU核数。如果运行在集群环境中,该值为集群中属于该应用的executor拥有的CPU总核数。
如果我们不想使用默认的并行度,可以在parallelize方法传入第二个参数指定,下面的代码指定并行度为10:
val params = sc.parallelize(1 to 10 , 10)
第2种方法通过创建对外部数据的引用来创建RDD:
val text:RDD[String] = sc.textFile("hdfs-path")
路径参数必须是Hadoop文件系统的的文件路径,例如本地文件路径,HDFS文件路径,或者HDFS暴露的web接口webhdfs。在内部,Spark使用MapReduce的TextInputFormat API读取文件,因此文件的分区和MapReduce是一样的,一个Block对应一个Partition。我们也可以明确指定想要的Partition数:
val text:RDD[String] = sc.textFile("hadoop-fs-path" , 10 )
另外,我们可以把整个文件当做一条记录来处理,此时创建的RDD是一个PairRDD,key为文件路径,value为文件的内容:
val file:RDD[(String,String)] = sc.wholeTextFiles("hadoop-fs-path")
除了读取文本文件外,Spark也可以读取其他格式的文件,例如SequenceFile:
sc.sequenceFile[IntWritable,Text](input-path)
对于Writable数据类型,Spark会自动将其转成对应的Java类型,因此下面这个语句与上边的等同:
sc.sequenceFile[Int,String](input-path)
对于任意类型的Hadoop InputFormat,有2中方式可以创建RDD:
val job = new Job()
AvroJob.setInputKeySchema(job,WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(input-path ,
classOf[AvroKeyInputFormat[WeatherRecord]],
classOf[AvroKey[WeatherRecord]],
classOf[NullWritable],
job.getCofiguration)
第二行中我们把Schema加入做Job的配置中,方法四个参数分别为:
第3中创建RDD的方式是从已有的RDD转换而来,下一部分的Transformation具体介绍这种方式。
Spark定义了2种作用于RDD的的操作:Transformation和Action。Transformation从一个已有的RDD,计算生成另一个RDD。Action作用在一个RDD上,计算后对结果进行操作,返回给用户或者保存到外部存储中。
调用Action会立即生效,但是Transformation是懒运行的(lazy),所有转换操作只有当Action触发之后才会执行。
val text = sc.textFile(input-path)
val lower: RDD[String] = text.map(_.toLowerCase)
lower.foreach(println(_))
上述代码中,map是一个操作,foreach是一个Action,只有当调用foreach时,Spark才会运行一个作业,从文件中读取数据创建RDD,针对每一个元素调用toLowerCase方法,输出控制台。
区分一个操作是Transformation还是Action,一般可以通过返回类型来判断。如果返回的是RDD,则为Transformation,否则为Action。关于RDD操作的描述,大部分可以在org.apache.spark.rdd包下的RDD类找到,特定类型的键值对RDD,可以在PairRDDFunctions中找到。
Spark的库中带了很丰富的操作,包括mapping,grouping,aggregating,repartitioning,sampling,joining等转换操作,Action操作包括从RDD中提取固定个数的元素,抽样,保存到外部等。更详细的描述可以在Spark文档中找到。
作用在键值对RDD上的聚合操作主要有3个:
这三个操作都是作用在每一个key上,对key上的值列表进行聚合操作,得到一个值。对应的reduce,fold,aggregate操作类似,但是是作用在整个RDD上,最终生成单一值。下面以一个例子来说明这三个Transformation:
val pairs:RDD[(String,Int)]=
sc.parallelize(Array(("a",3),("a",1),("b",7),("a",5)))
val sums: RDD[(String,Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a",9),("b",7)))
针对每一个key,reduceByKey操作将加法函数_+_
循环作用在所有值。例如,对于a这个key,其值有:3,1,5, 执行加法(3+1)+5=9,每一次计算将上一次计算结果与下一个值进行运算。因为这些操作通常都是在集群中并行执行,所以聚合函数必须是commutative和associative的,也就是计算结果跟的顺序无关,我们的例子中(3+1)+5与(5+3)+1结果是一样的。
foldByKey的转换操作如下:
val sums :RDD[(String,Int)] = pairs.foldByKey(0)(_+_)
assert( sums.collect().toSet === Set(("a",9),"b",7)))
不同于reduceByKey,foldByKey需要传递一个初始的"零值",不同类型的零值可能不同,此时聚合变成 (((0+3)+1)+5)=9,初始值第一个参与运算,其他值顺序无关,b相应的聚合为0+7=7。
reduceByKey和foldByKey都无法修改聚合结果的类型,即整数相加之后得到的依然数整数,不能修改类型。要想修改聚合结果的类型,需要使用aggregateByKey:
val sums :RDD[(String , HashSet[Int])]=
pairs.aggregateByKey(new HashSet[Int])( _+=_ ,_++=_)
assert( sums.collect().toSet ===
Set( ("a",Set(1,3,5)),
("b",Set(7))
)
)
我们提供了一个初始的值,即空的Set[Int],另外我们提供了两个函数(_+=_,_++=_)
.第一个函数控制值是如何合并到Set中的,_+=_
是a=a+b
的简写,对于Set而言,就是把值加入到原有的Set中,返回一个新的Set,原有Set保持不变。
第2个函数控制的是两个Set是如何合并在一起的,这个在reduce中合并来自己多个Partition的聚合结果时使用。这里的函数_++=_
表示将第二个Set中的元素都加到第一个Set中。
转换之后的RDD可以在内存中持久化,以便后续的操作可以更快速地访问。
前面我们提到将RDD转化成键值对的RDD,这个开销相对比较大,我们将转化的结果缓存下来:
tuples.cache()
调用cache方法并不会立刻将结果缓存,而是设置一个标志位告诉Spark:当运行这个作业的时候,将结果缓存下来。为了让这个缓存真正存在内存中,我们触发一个Action:
tuples.reduceByKey((a,b)=>Marh.max(a,b)).foreach(println(_))
返回结果中的BlockManagerInfo显示RDD的编号(RDD number)为4,有2个Paritition。同一个应用中后续作业如果需要用到这个RDD,将直接从缓存加载。我们运行一个找最小值的的转换:
tuples.reduceByKey((a,b) => Math.min(a,b)).foreach(println(_))
从输出信息可以看到RDD是直接从缓存加载的(Found Block locally),当数据量比较大事,能够节约的时间非常可观。相比于MapReduce,不同的作业之间如果想连接起来,只能通过写入文件,后续作业再从磁盘中读取。Spark可以将数据集缓冲在集群的内存中,后续作业可以快速读取数据。
Spark RDD的这种特性使得其在交互性的应用中非常实用,对于需要多次迭代的算法也很适合在Spark上运行,迭代过程中的产生的数据可以缓存在内存中,供下一轮迭代使用。迭代算法可以通过MapReduce实现,但是每一轮迭代的结果需要写入磁盘,下一轮再从磁盘读取,效率低下。
需要注意的是,缓存的RDD只能被同一个应用的作业读取,如果需要跨应用使用这些RDD,需要使用相应的saveAs*以及hadoopFile,hadoopRDD保存数据到外部存储,然后再写入。当一个应用运行结束后,其所有缓存的RDD将无法访问,除非保存到外部存储中。
调用cache方法,可以将RDD的各个partition保存到其executor的内存中,如果内存容纳不下,作业不会失败,而是在必要的时候重新计算。对于拥有很多Transformation的大型作业,重新计算是非常昂贵的。因此Spark提供了不同级别的持久化机制,在调用persist方法时传入StorageLevel参数,控制持久化类型和级别。
默认情况下,持久化级别为MEMORY_ONLY,这个级别的缓存数据,在内存中以对象的形式存在。可以通过将对象序列化成字节数组,达到更紧凑的数据格式,节约内存空间,这个级别是MEMORY_ONLY_SER。相比MEMORY_ONLY,这种序列化的方式更耗CPU,但是当未序列化的数据存不下内存而序列化之后可以存放于内存时,这种牺牲是值得的,用CPU换内存,这是常见的一种权衡。MEMORY_ONLY_SER的持久化级别同时降低了GC压力,因为RDD是以一个字节数组存在于内存中的,而不是很多对象。
如何知道内存能否容纳下RDD,可以通过查看BlockManager的日志文件来获取这一信息。另外,每一个驱动程序的SparkContext都在4040端口上运行了一个HTTP Server,这个服务器提供了很多实用的信息,例如关于SparkContext运行环境,其中运行的作业,还有缓存的RDD Paritition信息。
默认情况下,RDD Partition实用Java的序列化方式来序列化数据,但通常情况下Kyro是一个更好的选择,无论实在速度上还是空间效率上。另外可以通过压缩来进一步节约内存空间,但是这是以CPU计算为代价的。要使用压缩格式,将spark.rdd.compress属性设为true,并根据需要设置压缩算法spark.io.compression.codec.
如果重新计算RDD的代价非常昂贵,那么值得考虑另外的2中持久化级别:MEMORY_AND_DISK和MEMORY_AND_DISK_SER。前者会在内存不足时将数据写入到磁盘,后者在序列化后的数据仍然无法存放在内存时,将数据写到磁盘。
另外还有一些更高级的持久化特性,例如将RDD Partition的多个副本存在集群的不同节点上,或者使用对外(off-heap)内存,更详细内容参考Spark文档。
Spark中的序列化通常考虑2个方面:
默认情况下,Spark使用Java的序列化机制序列化数据,并通过网络发送给其他Executor,持久化(或者缓存)RDD Partition的时候也会涉及到数据序列化。实现了Serializable或Externalizable接口的对象使用Java标准的方式序列化之后,很容易被其他JVM应用理解。但是在性能和空间效率上不是很理想。
对于大部分的Spark程序来说,Kyro 序列化是更好的选择方式。Kyro是一种各高效的通用Java序列化库。要使用Kyro序列化,需要在SparkConf上配置如下序列化器:
conf.set("spark.serializer","org.apache.spark.serializer.KyroSerializer")
Kyro不要求实现任何特定的接口(像java.io.serializable那样),侵入性很小,所以POJO可以直接被序列化,除了启用Kyro序列化以外,无需任何其他操作。但是,如果在使用一个类之前先在Kyro注册,可以使得性能更加高效。如果没有注册,Kyro在序列化时会写入一个对象所属类的引用,每个被序列化的对象都会写入一个类的全称。如果事先注册,则只写入一个整数ID。Spark已经默认注册了Scala的类和许多其他框架的类,例如Avro Generic,Thrift的类。
在Kyro中注册类很直接,创建一个KyroRegistrator的子类,实现registerClasses方法:
class CustomKyroRegistrator extends KyroRegistrator {
override def registerClassed(kyro : Kyro) {
kyro.register(classOf[WeatherRecord])
}
}
然后在驱动程序中,将Registrator的类名全称赋给spark.kyro.registrator属性:
conf.set("spark.kyro.registrator","CustomKyroRegistrator")
在Scala中,函数的序列化采用的是Java标准的序列化机制。Spark也使用这种标准的方式发送函数给远程的Executor节点。事实上,即使运行在local模式(即所有Spark组价都在一个JVM中运行),Spark也会序列化函数。如果在代码中使用了不可序列化的函数,Spark将会报错,例如从不可序列化类的一个方法中转换而来的方法是不可序列化的。
Spark程序经常需要访问不在RDD中的数据,例如下面的代码在map方法中使用了一个查询表:
val lookup = Map( 1 -> "a" , 2 ->"e" , 3->"i" , 4->"0",5->"u")
val result = sc.parallelize(Array(2,1,3)).map(lookup(_))
assert( result.collect.toSet === Set("a" , "e","i"))
这种方式没有问题,lookup变量被序列化之后作为一个闭包(closure)传递给map方法。但是使用广播变量(broadcast variables),可以更高效地达到同样的效果
广播变量会被序列化,然后发送给每一个executor,executor将广播变量缓存,以便后续需要的时候使用,这不同于被序列化为闭包一部分的常规变量,常规变量会作为闭包的一部分通过网络发送给每个task。广播变量有点类似MapReduce中的分布式缓存,虽然Spark的实现中将数据存储在内存,只有在内存耗尽时才会写到磁盘(而MapReduce的分布式缓存位于磁盘中)。
使用SparkContext的broadcast方法来广播变量,该方法返回一个Broadcast[T]类型的包装器:
val lookup:Broadcast[Map[Int,String]] =
sc.broadcast(Map(1->"a",2->"e",3->"i",4->"o", 5 ->"u"))
val result = sc.parallelize(Array(2,1,3)).map(lookup.value(_))
广播变量的值通过value来访问。注意的是,广播变量是单向传播的,从驱动到任务,没有办法更新广播变量或者将更新传播回驱动程序。这种情况下,可以通过Accumulator来实现。
累积器是一个共享变量,任务只能做增加操作,就像MapReduce中的counter。当一个作业运行完毕后,累积器的最终值可以从驱动程序中获取。下面这个例子使用累积器来统计整形的RDD中有多少个元素,同时使用reduce Action来计算总和:
val count:Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1,2,3))
.map( i => {count += 1; i})
.reduce((x,y) => x + y)
assert(count.value === 3)
assert(result === 6)
累计变量count通过SparkContext的accumulator方法创建。其中的map操作是个identify function,原值返回,但是产生了count加1的副作用。当作业运行完毕之后,累积器的值通过value访问得到。
上面的例子中,我们使用Int类型的累积器,但是任何数值类型的数据类型都可以用于累积器。Spark另外提供了accumulable方法,用于累积器的结果类型和被累积的类型不一致的情况,accumulableCollection用于累积可变集合的值。更多内容参考Spark文档。
Spark作业的运行从宏观上看,只要由driver和executors组成。driver运行application(SparkContext)、调度作业(schedule tasks)。executor负责具体任务的执行。通常情况下,driver运行在client机器上(client一般不受整个集群管理),但是在YARN的cluster模式上,driver运行在Application Master上。
下图是Spark运行作业的整体流程:
当有Action在RDD上执行时,作业被自动提交,提交将调用SparkContext的run_job被调用,进而把作业提交给Scheduler。Scheduler运行在driver上,由两部分组成:
在介绍作业如何被分解为DAG之前,我们需要了解一下stage可以执行的任务类型,stage可以运行2中类型的任务:shuffle map task和result task。
最简单的作业可以只有Result Task,也就是只有一个由Result Task组成的stage。对于复杂的应用,可能需要组合多个shuffle stage。例如,下面的例子中我们想要计算词语的频率分布直方图:
val hist:Map[Int,Long] = sc.textFile(inputPath)
.map(word => (word.toLowerCase(),1))
.reduceByKey((a,b)=>a+b)
.map(_.swap)
.countByKey()
前两个Transformation统计词频,即计算每个词语出现的次数。第三个Transformation对调key和value,得到的是(count,word)。最后的Action countByKey()得到频率直方图,即出现N词的词语有M个。
Spark的DAG Scheduler将这个作业分解为2个Stage,因为reduceByKey()这个Operation需要通过shuffle stage来完成。最后的DAG如下图所示:
在一个stage内,RDD通过也被组织成DAG。上图中展示了RDD的类型以及产生该RDD的操作。RDD[String]有sc.textFile()创建。图中省略了一些Spark内部产生的RDD,例如textFile()创建的RDD事实上是MappedRDD[String],其父类为HadoopRDD[LongWritable,Text]。
注意到reduceByKey同时出现在两个Stage中,这是因为它是使用shuffle实现的,在map端(stage1)reduce函数作为combiner运行,在reduce端(stage2)作为reducer运行。这一点类似于MapReduce,有些情况下降Reducer的实现直接作为map端的Combinor,对map任务的输出结果先做一次预聚合,可以避免在网络上传输大量数据。
Spark的shuffle实现把输出写入到本地的分区文件(partitioned file),即使是内存级别的RDD,这些文件被下一个stage的RDD取走。
如果RDD已经被上一个job(同一个Application)持久化,则DAG Scheduler不会再创建stage重新计算这个RDD(或者由这个RDD衍生的其他RDD)。
DAG Scheduler还负责将stage分级为task,然后将task提交给Task Scheduler。在这个例子中,输入文件的每一个partition,运行一个task(shuffle map),reduceByKey()的并行度(parallelism)可以通过其第二个参数设置,如果没有设置,则从其上一级RDD推断,这个例子中就是输入数据的partition数量。
针对每一个Task,DAG Scheduler都给出了一个位置偏好(placement preference),Task Scheduler可以根据这些偏好,更有效地理由本地数据的优势(data locality)。例如,如果task处理的是来自HDFS的RDD,则更倾向于运行在拥有对应数据的节点上(node local)。而如果一个task处理的数据来自于缓存在内存中的RDD partition,则倾向于运行在内存中拥有这些数据的executor(process local)。
一旦DAG Scheduler构建完stage的DAG后,将每个stage的任务提交给task scheduler。子stage只有在其上一级完成之后才提交。
当task scheduler收到一组任务后,它根据应用持有的executor列表(在YARN中,Spark会实现申请固定数量的容器,然后自己决定如何使用这些Container,这一点不同于批处理的MapReduce,MapReduce按需申请Container),结合任务的位置偏好,决定每个任务运行在哪个executor,即task-executor映射。然后把task分配给空闲的executor,直到任务集都运行完毕。默认情况下,每个task分配一核CPU,CPU数量可以通过spark.task.cpus
设置。
对于一个给定的executor,作业调度器优先分配有process-local偏好的task,然后依次是node-local,rack-local。最后才是没有位置偏好(nonlocal的任务或者推测执行(speculative)的任务。
已分配的任务通过scheduler backend启动,这个backend发送启动任务的消息给对应的executor backend,告知executor开始运行任务。Spark使用Akka发送远程消息,而不是使用Hadoop的RPC机制,Akka用于构建高并发和分布式的JVM应用,提供了工具箱和运行时。
Executor在任务结束或者失败的时候发送状态消息给driver,如果任务执行失败,task scheduler将重新提交任务到另一个executor。如果启用了推测执行(默认没有启用),对于运行慢的任务,会启用speculative任务。
Executor接到运行任务的消息后,首先确认任务需要的jar包和文件都是最新的。如果之前的任务运行过,executor会在本地缓存这些jar包和文件,只有当发生变更时才会重新下载。接着反序列化任务代码,任务代码以序列化字节的形式通过任务启动消息发送到executor。最后任务在executor同一个JVM中被执行,因此无需再任务启动的时候重新启动JVM。
任务执行的结果序列化之后发送给executor backend,然后作为状态更新消息(status update message)发回给driver。如果是shuffle map task,返回中包含的信息用于下一个stage提取数据,类似于MapReduce中map任务完成之后,通过心跳发送消息给Application Master,然后Reducer通过心跳得知map已经运行完毕,进而去copy数据。如果是Result task,将对应partition的运行结果发送给driver,driver组合出最终结果。
上一部分我们了解了Spark是如何依靠executor来运行任务的。接下来进一步了解executor究竟是如何启动的。Spark中,executor的声明周期通过cluster manager来管理,Spark提供了多种不同的manager:
Mesos和YARN的资源管理方式优于Standalone,它们考虑了集群中其他应用对资源的需求(例如MapReduce作业),而Standalone采用静态的资源分配方式,没有办法动态地调整以满足集群的其他资源需求。YARN是唯一一个与Hadoop的Kerberos安全机制集成的cluster manager。
Spark可以通过两种模式运行在YARN上:
在YARN client模式上,在driver构造出SparkContext实例(下图step1)的时候,就开始于YARN进行交互。Context向YARN的资源管理器(RM)提交一个应用(step2),RM在集群的NodeManager中启动一个Container,并在Container中运行Spark ExecutorLauncher(step3)。ExecutorLauncher的任务是向RM申请资源(step4),并在申请到资源后将Executor Backend作为容器在相应的NodeManager中启动(step5)
当executor启动后,回头连接到SparkContext并注册自己,注册的这个步骤可以给SparkContext提供整个应用的executor信息,一遍task scheduler在决定将任务运行在哪个节点(task placement decision)的时候,可以考虑任务的位置偏好。
executor的数量在启动spark-shell,spark-submit或者pyspark时指定,如果没有指定,默认启动2个executor。每个executor使用的CPU核数(默认1)和内存大小(默认1024M)也可以在这个时候设置,下面这个例子启动spark-shell,运行4个executor:
spark-shell --master yarn-client --num-executors 4 --executor-cores 1 -- executor-memory 2g
不同于Standalone或者Mosos,YARN的NM地址并没有在这里配置,而是从Hadoop的配置中提取,配置的目录通过HADOOP_CONF_DIR环境变量设置。
在cluster模式下,用户的驱动程序(driver,下图的Spark Program)运行在YARN的Application Master进程中,使用该模式时,指定master的url类似:
spark-submit --master yarn-cluster ...
其他参数,例如executor数量,jar包或者python文件,与client mode一样。如下图所示,spark-submit客户端将启动一个YARN Application(step1通过请求NM),但是它不运行任何用户代码。Application Master在开始为executor申请资源(step4)之前将启动驱动程序(step3b),其他的过程与client mode一样。
注意此时我们如果再访问刚才的4040端口,发现页面自动跳转到YARN的应用程序管理界面了,url类似:
master:18088/proxy/application_1469461440579_0001
YARN的两种模式下,启动executor之前对于数据放在哪里(data locality)并不知道,所以启动的executor可能与所要处理的数据不在一个节点上(从而task的位置偏好无法得到满足)。对于交互式的session来说,这个是比较容易接受的,因为在启动一个交互会话时,很可能并不知道要处理哪些数据。但是在生产环境中就不是这样了,所以Spark提供一种方式,当应用运行在cluster模式时,可以通过位置提示(placement hint)来提高data locality,这是通过构造SparkContext时传入位置偏好达到的。
构造SparkContext之前,我们可以使用InputFormatInfo这个工具类来获取位置偏好,例如对于文本文件,使用TextInputFormat,如下可以获取位置偏好:
val preferredLocations = InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(new Configuration(),classof[TextInputFormat],inputPath)))
val sc = new SparkContext(conf , preferredLocations)
这些位置偏好信息在Application Master为executor申请资源的时候可以使用,目前该特性的API还不是很稳定。
最后我们提交一个Spark自带的例子到YARN集群运行:
export HADOOP_CONF_DIR=/home/hadoop-2.6.0/etc/hadoop/
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --executor-memory 2G --num-executors 6 /home/spark-1.6.0/lib/spark-examples-1.6.0-hadoop2.6.0.jar 1000
运行结果:
运行成功后在YARN管理界面可以看到这个Spark作业:
大部分内容来自《Hadoop权威指南》第4版