@Vany
2016-05-05T19:46:47.000000Z
字数 6199
阅读 1962
CentOS
Hadoop
Spark
Jupyter
启动Jupyter(内核是pyspark)的方法(这个应该是local本机模式):
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark
加上一些其他的配置:
最后的我一般用的命令是:
#!/usr/bin/bash
PYTHONHASHSEED=0 PYSPARK_PYTHON=python3 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=0.0.0.0 --port=7777" pyspark --master=yarn-client --num-executors=7 --executor-cores=8
在Jupyter中,建立一个Python3的notebook,等待Kernel启动完成,即可开始使用。
另外,在notebook中,他是自动建立SparkContext变量sc的,可以直接使用。例如利用投针法计算Pi的近似值:
# Spark Pi
from random import random
def sample(_):
x, y = random(), random()
return 1 if x**2+y**2<=1 else 0
n_samples = 10**8
print(sc.parallelize(range(n_samples)).map(sample).sum()*4.0/n_samples)
Jupyter-notebook可以用作我们平常交互式测试时使用,但是如果要运行一些任务,时间很长,想看看执行的进度,或者还有很多其他附属的文件需要一起执行,内存占用之类的还需要个性配置,那么用spark-submit命令提交任务最好。
利用python运行main.py,提交任务,设置其依赖的文件a.py,b.py,c.py(因为每个executor是单独运行的,因此其依赖文件也要指定好,然后传输到各个executor上去)以及运行环境(YARN-Client, 7个Executor,每个8个CPU核心),写法如下:
PYSPARK_PYTHON=python3 spark-submit --master=yarn-client --num-executors=7 --executor-cores=8 --files a.py,b.py,c.py main.py
在这种情况下,我们的main.py中就要手动建立SparkContext也即sc变量了:
from pyspark import SparkContext, SparkConf, StorageLevel
conf = SparkConf()
.setMaster("yarn-client")
.setAppName("Spark-App")
.set("spark.storage.memoryFraction", "0.9")
sc = SparkContext(conf=conf, pyFiles=['a.py', 'b.py', 'c.py'])
# process...
sc.stop()
还可以传参数设置内存:
PYSPARK_PYTHON=python3 spark-submit --master=yarn-client --num-executors=7 --executor-cores=6 --driver-memory=2g --executor-memory=10g --files bikeUtil.py,weatherHoliday.py,stationFlow.py,predictor.py main.py
在SparkConf, SparkContext这里,我们也可以通过传递参数pyFiles来设置依赖的文件,通过SparkConf的set函数来设置一些参数。
据说SparkContext.addFile(...)可以提交依赖的文件(还未测试)
详情请见官方文档: https://spark.apache.org/docs/latest/configuration.html
在利用Python编写并行化程序时该如何思考呢?先讲一些程序结构。
实际上Spark的架构是分为Driver和Worker的,Driver负责执行总程序(额……意会意会),负责调用Spark的对象来并行化执行,从而Spark架构帮助我们将要执行的任务传输到各个worker上执行,最后再汇总回来。
因此,其实我们用Python编写程序,不涉及到sc相关的变量时,其实我们就是在跑Python程序,调用sc产生的rdd的map,reduceByKey,collect时会调用相关的Spark架构去并行化执行。
sc.parallelize是并行化的起点(当然还有其他函数,例如textFile):
rdd = sc.parallelize(...)
创造一个rdd,rdd可以transformation,但并不会真正执行,只有在action执行时才会,即lazy-evaluation,具体这里就不展开了。
关于他具体是如何并行化的呢?根据原理有两种方法,一种是基于Hash的Partitioner,一种是Ranger,HashPartitioner相当于一个Hash函数,将key转化到对应的Partition分区的编号。RangePartitioner的原理还有待探究。
我们可以用sc.textFile(..)
来读取相关文本文件,默认是从HDFS读取的,不需要写前面的hdfs://...
;
另外,我们也可以通过本机读取,但文件名需要这样写:file:///...
。但是本机读取需要注意的一点是,由于worker是分布式执行的,因此本机读取的文件需要放在各个分布式的机器上,推荐可以在Driver端读取完,然后broadcast到各个worker。
一般在__main__里面声明的变量,会自动打包并传送到worker中,在worker的代码中会访问的到。但是如果是在函数中建立的,需要用global实现。
但是如果可以的话最好用broadcast将变量广播出去,这样就不用每次task把信息重新传输一遍了。
共享变量broadcast [1]
Q: 如何在传给spark transformations操作的函数中访问共享变量?
A: 根据官网Programming Guide文档说明(参见这里),当作为参数传给spark操作(如map或reduce)的函数在远程机器的节点上执行时,函数中使用到的每个变量的副本(separate copies)也会被拷贝到这些节点上以便函数访问,如果这些变量在节点上被修改,那这些修改不会被反传回spark driver program,即在实现业务代码时,应由实现者保证这些变量的只读特性。因为在不同任务间维护通用的、支持读/写的共享变量会降低spark效率。
举个例子,下面的代码说明了如何在传给spark操作的函数中借助全局变量实现共享访问:Q: 除通过global variable共享变量外,spark还支持什么方式共享变量?
A: Spark还支持broadcast变量和accumulators这两种共享变量的方式。其中,broadcast允许开发者在spark集群的每个节点上保持变量的一份只读cache,本质上,broadcast变量也是global变量,只不过它是由开发者显式分发到集群节点上的,而非spark根据每个task调用的函数对变量的访问情况自动拷贝。至于accumulators,顾名思义,它只支持add操作,具体语法可参考spark programming guide关于accumulators部分的说明。Q: broadcast变量与普通global变量有何关系?各自的适用场合?
A: 实际上,broadcast变量是一种global变量,它们均可以实现在分布式节点中执行函数时共享变量。其中,普通global变量是随着spark对task的调度根据实际情况由spark调度器负责拷贝至集群节点的,这意味着若有需访问某global变量的多个task执行时,每个task的执行均有变量拷贝过程;而broadcast变量则是由开发者主动拷贝至集群节点且会一直cache直至用户主动调用unpersist或整个spark作业结束。
PS: 实际上,即使调用unpersist也不会立即释放资源,它只是告诉spark调度器资源可以释放,至于何时真正释放由spark调度器决定,参见SPARK-4030。
结论:若共享变量只会被某个task使用1次,则使用普通global变量共享方式即可;若共享变量会被先后执行的多个tasks访问,则broadcast方式会节省拷贝开销。
再次提醒:若使用了broadcast方式共享变量,则开发者应在确定该变量不再需要共享时主动调用unpersist来释放集群资源。
Broadcast的机理详见:https://github.com/JerryLead/SparkInternals/blob/master/markdown/7-Broadcast.md
一台机器只能启动一个Spark?否则会Address冲突,如何解决多人同时使用的问题?
16/03/21 06:38:23 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use
貌似可以重新注册一个ip,可以同时跑,关键是内存不够……
如果Executor注册上了,但是执行时,又断掉了(ERROR YarnScheduler: Lost executor xxx
),那很可能是由于内存不够的原因。
在执行的时候设置driver-memory, executor-memory, 试着调整Spark配置中spark.yarn.driver.memoryOverhead, executor.memoryOverhead的大小[2] [3] [4] [5],memoryOverhead一般用处不太大。
下面是摘自网上性能调优Blog的一段话 [6]:
在YARN模式下:
集群task并行度:SPARK_ EXECUTOR_INSTANCES* SPARK_EXECUTOR_CORES;
集群内存总量:(executor个数) * (SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead)+(SPARK_DRIVER_MEMORY+spark.yarn.driver.memoryOverhead)。
重点强调:Spark对Executor和Driver额外添加堆内存大小,Executor端:由spark.yarn.executor.memoryOverhead设置,默认值executorMemory * 0.07与384的最大值。Driver端:由spark.yarn.driver.memoryOverhead设置,默认值driverMemory * 0.07与384的最大值。
通过调整上述参数,可以提高集群并行度,让系统同时执行的任务更多,那么对于相同的任务,并行度高了,可以减少轮询次数。举例说明:如果一个stage有100task,并行度为50,那么执行完这次任务,需要轮询两次才能完成,如果并行度为100,那么一次就可以了。
但是在资源相同的情况,并行度高了,相应的Executor内存就会减少,所以需要根据实际实况协调内存和core。此外,Spark能够非常有效的支持短时间任务(例如:200ms),因为会对所有的任务复用JVM,这样能减小任务启动的消耗,Standalone模式下,core可以允许1-2倍于物理core的数量进行超配。
rdd.cache()
然后看一下内存变化,就知道这次数据集所占的空间了。性能调优详见:http://www.raychase.net/3546
另外,有一个参数最好需要设置一下,默认是0.6,表示我们给executor设置的memory中只有60%的资源是分配给运行时的内存,其余的分配给RDD.cache()时的空间。一般如果cache不是很大的话,那么放宽这个比例可以赢得更大的内存,以运行更多的cores:
conf = SparkConf().set("spark.storage.memoryFraction", "0.9")