[关闭]
@xtccc 2016-01-03T11:44:51.000000Z 字数 3775 阅读 3988

Serialization

给我写信
GitHub

此处输入图片的描述

Spark



Spark的序列化


Spark数据以records的形式进行流动,record以两种形式存在:序列化的字节数组与反序列化的Java对象。一般来说,Spark存放在内存中的records是deserialized Java objects,在磁盘中存储及在网络中传输的records是serialized bytes

目前,将in-memory shuffle data以序列化的形式来存储的工作正在进行中,参见SPAKR-2926

在写Spark的应用时,尝尝会碰到序列化的问题。例如,在Driver端的程序中创建了一个对象,而在各个Executor中会用到这个对象 —— 由于Driver端代码与Executor端的代码运行在不同的JVM中,甚至在不同的节点上,因此必然要有相应的序列化机制来支撑数据实例在不同的JVM或者节点之间的传输。




什么时候需要调用序列化?


结论

先把结论给出:


以类UnserializableBean为例,下面分别给出说明。
QQ20160102-1@2x.png-73.7kB


RDD自身是serializable

测试代码:
QQ20160102-4@2x.png-88.7kB

运行结果
QQ20160102-2@2x.png-26.3kB

所以,可以证实本结论。


Clousure必须是Serializable

QQ20160102-5@2x.png-115.3kB

这里的bytes是在driver中创建的变量,且它被用在了closure中,是captured variable。

一个closure中的全部captured variables都是serializable,这个closure才是serializable。

运行这些代码会报错,因为ImmutableBytesWritable自身是unserializable。

QQ20160102-0@2x.png-610.7kB

因此,可以证实本结论。

值得注意的是:目前对闭包的序列化只支持Java Serializer


从execturor传递到driver的数据是serializable

QQ20160102-7@2x.png-103.7kB

运行结果
QQ20160102-8@2x.png-31.7kB

这里,虽然UnserializableBean是不可序列化的类,但是当将该类型的数据从executor传递到driver时,并没有出现异常。因此,可以证实本结论。




什么样的数据类型能够被Spark序列化


首先给出结论:不论是使用Java的序列化框架,还是使用Kryo的序列化框架,自定义的class都必须 implements java.io.Serializable 才能被序列化。

Learning Spark 中对序列化有如下描述:

Whether using Kryo or Java's serializer, you may encounter a NotSerializableException if your code refers to a class that does not extend Java's Serializable interface.

例如,下面是一个自定义的类 CustomBean
QQ20160102-1@2x.png-70.5kB


它可以被如下使用:
QQ20160102-2@2x.png-93.7kB


如果我们使用了第三方的类,但是这个类没有实现 java.io.Serializable 接口,而且我们也无法修改这个第三方类的实现,怎么办?

Learning Spark 由如下描述:

If you cannot modify the class in question you will need to use some advanced workarounds, such as creating a subclass of the type in question that implements Java's Serializable interface or customizing the serialization behavior using Kryo.



如果需求是从driver向executor传递一个class instance,则可以通过broadcast来实现,即使该class没有implements Serializable,或者包含无法被序列化的成员:
QQ20160102-5@2x.png-74.9kB
QQ20160102-4@2x.png-119.9kB




Kryo


根据 Tuning Spark,Spark默认使用Java的序列化框架。『只要一个class实现了java.io.Serializable接口,那么Spark就能使用Java的ObjectOutputStream来序列化该类』。



实际上,Spark还支持另一种序列化框架 —— Kryo 。 Kryo是一个高效的序列化框架(可以比Java的序列化快10倍以上)。但是Kryo并不能支持所有的Serilizable class,因此需要在使用Kryo前对目标类进行注册(register)。


使用Kryo作为Spark的序列化框架的方法如下:
QQ20160102-7@2x.png-38.4kB

其中:

SparkConf#set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")要求将Kryo作为Spark的序列化框架

SparkConf#registerKryoClasses是为Kryo注册需要被序列化的classes



有一些其他的参数可以设置:








闭包相关的序列化


什么是闭包


闭包的序列化

看一个例子:

  1. class MyCoolApp {
  2. val repos: RDD[Repository] = ···
  3. val team: RDD[String, List[String]] = ···
  4. def projects() = {
  5. val filtered = repos.filter { repo =>
  6. team.exists(user => repo.contri.contains(user))
  7. }
  8. filyered.collect
  9. }
  10. }

运行上面的例子会报错:java.io.NotSerializableException

原因是:第6行~第8行代码构成的closure无法被序列化。

那么,一个闭包怎样才能被序列化呢?

    A closure is serializable if all captured variables are serialzable.



这个闭包中的captured variables是哪些呢?

    乍一看,变量 team 是唯一的captured variable,实际上,this (team所在的MyCoolApp实例)才是captured variable。但是,类MyCoolApp不能被序列化,因为它没有实现接口 Serializable


令closure可以序列化

怎样解决上面的问题呢?

可以用local variable来代替class field:

  1. class MyCoolApp {
  2. val repos: RDD[Repository] = ···
  3. val team: RDD[String, List[String]] = ···
  4. val localTeam = team
  5. def projects() = {
  6. val filtered = repos.filter { repo =>
  7. localTeam.exists(user => repo.contri.contains(user))
  8. }
  9. filyered.collect
  10. }
  11. }


值得注意的是:目前对闭包的序列化只支持Java Serializer

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注