@liyuj
2017-04-13T22:16:12.000000Z
字数 7803
阅读 3348
Apache-Ignite-1.9.0-中文开发手册
Ignite提供了一个Spark RDD抽象的实现,他可以容易地在内存中跨越多个Spark作业共享状态,原生SparkRDD和IgniteRDD
的主要不同在于IgniteRDD提供了一个跨越不同Spark作业、workers或者应用的数据的共享内存视图,而原生的SparkRDD无法被其他的Spark作业或者应用看到。
IgniteRDD实现的方式是作为一个分布式的Ignite缓存的视图,他可以部署在Spark执行进程内部,或者Spark workder上或者他自己的集群上。这意味着根据选择的不同的部署模型,共享状态可能只存在于一个Spark应用的生命周期内(嵌入式模式),或者可能存在于Spark应用外部(独立模式),这时状态可以在多个Spark应用之间共享。
IgniteContext是Spark和Ignite集成的主要入口点。要创建一个Ignite上下文的实例,必须提供一个SparkContext的实例以及创建IgniteConfiguration
的闭包(配置工厂)。Ignite上下文会确保Ignite服务端或者客户端节点存在于所有参与的作业实例中。或者,一个XML配置文件的路径也可以传入IgniteContext
构造器,它会用于配置启动的节点。
当创建一个IgniteContext
实例时,一个可选的booleanclient
参数(默认为true
)可以传入上下文构造器,这个通常用于一个共享部署安装,当client
设为false
时,上下文会操作于嵌入式模式然后在上下文创建期间在所有的worker上启动服务端节点。可以参照15.3.安装与配置
章节了解有关部署配置的信息。
一旦创建了IgniteContext
,IgniteRDD
的实例可以通过fromCache
方法获得,当RDD创建之后请求的缓存在Ignite集群中是否存在不是必要的,如果指定名字的缓存不存在,会用提供的配置或者模板配置创建它。
比如,下面的代码会用默认的Ignite配置创建一个Ignite上下文:
val igniteContext = new IgniteContext(sparkContext,
() => new IgniteConfiguration())
下面的代码会从example-cache.xml
的配置创建一个Ignite上下文:
val igniteContext = new IgniteContext(sparkContext,
"examples/config/example-cache.xml")
IgniteRDD
是一个SparkRDD抽象的实现,他表示Ignite的缓存的活动视图。IgniteRDD
不是一成不变的,Ignite缓存的所有改变(不论是他被另一个RDD或者缓存的外部改变触发)对于RDD用户都会立即可见。
IgniteRDD
利用Ignite缓存的分区性质然后向Spark执行器提供分区信息。IgniteRDD
中分区的数量会等于底层Ignite缓存的分区数量,IgniteRDD
还通过getPrefferredLocations
方法向Spark提供了关系信息使RDD计算可以使用本地的数据。
从Ignite中读取数据
因为IgniteRDD
是Ignite缓存的一个活动视图,因此不需要从Ignite向Spark应用显式地加载数据,在IgniteRDD
实例创建之后所有的RDD方法都会立即可用。
比如,假定一个名为partitioned
的Ignite缓存包含字符值,下面的代码会查找包含单词Ignite
的所有值:
val cache = igniteContext.fromCache("partitioned")
val result = cache.filter(_._2.contains("Ignite")).collect()
向Ignite保存数据
因为Ignite缓存操作于键-值对,因此向Ignite缓存保存数据的最明确的方法是使用Spark数组RDD以及savePairs
方法,如果可能的话,这个方法会利用RDD分区的优势然后以并行的方式将数据存入缓存。
也可能使用saveValues
方法将只有值的RDD存入Ignite缓存,这时,IgniteRDD
会为每个要存入缓存的值生成一个唯一的本地关系键。
比如,下面的代码会使用10个并行存储操作保存从1到10000的整型值对到一个名为partitioned
的缓存中:
val cacheRdd = igniteContext.fromCache("partitioned")
cacheRdd.savePairs(sparkContext.parallelize(1 to 10000, 10).map(i => (i, i)))
在Ignite缓存中执行SQL查询
当Ignite缓存配置为启用索引子系统,就可以使用objectSql
和sql
方法在缓存中执行SQL查询。可以参照4.6.缓存查询
章节来了解有关Ignite SQL查询的更多信息。
比如,假定名为partitioned
的缓存配置了索引整型对,下面的代码会获得 (10, 100)范围内的所有整型值:
val cacheRdd = igniteContext.fromCache("partitioned")
val result = cacheRdd.sql(
"select _val from Integer where val > ? and val < ?", 10, 100)
GitHub上有一些示例,演示了IgniteRDD
如何使用:
共享部署意味着Ignite节点的运行独立于Spark应用然后即使Spark作业结束之后也仍然保存状态。类似于Spark,将Ignite部署入集群有两种方式:
独立部署
在独立部署模式,Ignite节点应该与Spark worker节点部署在一起。Ignite安装的介绍可以参照1.2.入门
章节,在所有的worker节点上安装Ignite之后,通过ignite.sh
脚本在每个配置好的Spark workder上启动一个节点。
默认将Ignite库文件加入Spark类路径
Spark应用部署模型可以在应用启动期间动态地发布jar,但是这个模式有一些缺点:
getResource
方法,因此无法访问位于jar文件内部的资源;ClassNotFoundException
;有一个方法来对每一个启动的应用修改默认的Spark类路径(这个可以在每个Spark集群的机器上实现,包括master节点,worker节点以及driver节点)。
$SPARK_HOME/conf/spark-env.sh
文件,如果该文件不存在,用$SPARK_HOME/conf/spark-env.sh.template
这个模板创建它;spark-env.sh
文件的末尾(如果没有全局定义IGNITE_HOME
的话,需要将设置IGNITE_HOME
的行的注释去掉)。
# Optionally set IGNITE_HOME here.
# IGNITE_HOME=/path/to/ignite
IGNITE_LIBS="${IGNITE_HOME}/libs/*"
for file in ${IGNITE_HOME}/libs/*
do
if [ -d ${file} ] && [ "${file}" != "${IGNITE_HOME}"/libs/optional ]; then
IGNITE_LIBS=${IGNITE_LIBS}:${file}/*
fi
done
export SPARK_CLASSPATH=$IGNITE_LIBS
也可以验证Spark的类路径是否被运行bin/spark-shell
所改变,然后输入一个简单的import语句:
scala> import org.apache.ignite.configuration._
import org.apache.ignite.configuration._
MESOS部署
Ignite可以部署在Mesos集群上,可以在2.11.Mesos部署
章节参照Mesos部署说明。
嵌入式部署意味着Ignite节点是在Spark作业进程内部启动的,然后当作业结束时就停止了,这时不需要额外的部署步骤。Ignite代码会通过Spark的部署机制分布到workder机器然后作为IgniteContext
初始化的一部分在所有的workder上启动节点。
Ignite的Spark构件已经上传到Maven中心库,根据使用的Scala版本,引入下面的对应的依赖:
Scala 2.11
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark</artifactId>
<version>${ignite.version}</version>
</dependency>
Scala 2.10
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark_2.10</artifactId>
<version>${ignite.version}</version>
</dependency>
如果在Scala应用中使用SBT作为构建工具,那么可以使用下面的一行命令,将Ignite的Spark构件加入build.sbt
:
Scala 2.11
libraryDependencies += "org.apache.ignite" % "ignite-spark" % "ignite.version"
Scala 2.10
libraryDependencies += "org.apache.ignite" % "ignite-spark_2.10" % "ignite.version"
这里会简要地介绍Spark和Ignite集群的启动过程,可以参照Spark文档来了解详细信息。
为了测试,需要一个Spark master以及至少一个Spark workrer,通常Spark master和Spark workders是不同的机器,但是为了测试可以在启动master的同一台机器上启动worker。
SPARK_HOME
);IGNITE_HOME
);$SPARK_HOME
然后执行如下的命令:
sbin/start-master.sh
这个脚本会输出启动过程的日志文件的路径,可以在日志文件中查看master的URL,他的格式是:spark://master_host:master_port
。也可以在日志文件中查看WebUI的URL(通常是http://master_host:8080
)。
4. 转到每个worker节点的$SPARK_HOME
然后执行如下的命令:
bin/spark-class org.apache.spark.deploy.worker.Worker spark://master_host:master_port
这里的spark://master_host:master_port
就是从上述的master的日志文件中抓取的master的URL。在所有的worker节点都启动之后可以查看master的WebUI界面,他会显示所有的处于ALIVE
状态的已经注册的worker。
5. 转到每个worker节点的$IGNITE_HOME
目录然后通过运行如下的命令启动一个Ignite节点:
bin/ignite.sh
这时可以看到通过默认的配置Ignite节点会彼此发现对方。如果网络不允许多播通信,那么需要修改默认的配置文件然后配置TCP发现。
现在,在集群启动运行之后,可以运行spark-shell
来验证这个集成:
1.启动spark-shell
--repositories
参数,但是他可能会被忽略):
./bin/spark-shell
--packages org.apache.ignite:ignite-spark:1.8.0
--master spark://master_host:master_port
--repositories http://repo.maven.apache.org/maven2/org/apache/ignite
--jars
参数提供指向Ignite的jar文件的路径:
./bin/spark-shell --jars path/to/ignite-core.jar,path/to/ignite-spark.jar,path/to/cache-api.jar,path/to/ignite-log4j.jar,path/to/log4j.jar --master spark://master_host:master_port
这时可以看到Spark shell已经启动了。
注意,如果打算使用Spring的配置进行加载的话,那么需要同时添加ignite-spring
的依赖。
./bin/spark-shell
--packages org.apache.ignite:ignite-spark:1.8.0,org.apache.ignite:ignite-spring:1.8.0
--master spark://master_host:master_port
2.通过默认的配置创建一个Ignite上下文的实例
import org.apache.ignite.spark._
import org.apache.ignite.configuration._
val ic = new IgniteContext(sc, () => new IgniteConfiguration())
然后可以看到一些像下面这样的:
ic: org.apache.ignite.spark.IgniteContext = org.apache.ignite.spark.IgniteContext@62be2836
创建一个IgniteContext实例的另一个方式是使用一个配置文件,注意如果指向配置文件的路径是相对形式的,那么IGNITE_HOME
环境变量应该是在系统中全局设定的,因为路径的解析是相对于IGNITE_HOME
的。
import org.apache.ignite.spark._
import org.apache.ignite.configuration._
val ic = new IgniteContext(sc, "config/default-config.xml")
3.通过使用默认配置中的"partitioned"缓存创建一个IgniteRDD的实例
val sharedRDD = ic.fromCache[Integer, Integer]("partitioned")
然后可以看到为partitioned缓存创建了一个RDD的实例:
shareRDD: org.apache.ignite.spark.IgniteRDD[Integer,Integer] = IgniteRDD[0] at RDD at IgniteAbstractRDD.scala:27
注意RDD的创建是一个本地的操作,并不会在Ignite集群上创建缓存。
4.这时可以用RDD让Spark做一些事情,比如,获取值小于10的所有键值对
sharedRDD.filter(_._2 < 10).collect()
因为缓存还没有数据,因此结果会是一个空的数组:
res0: Array[(Integer, Integer)] = Array()
可以查看远程spark worker的日志文件然后可以看到Ignite上下文如何在集群内的所有远程worker上启动客户端。也可以启动命令行Visor然后查看partitioned
缓存已经创建了。
5.在Ignite中保存一些值
sharedRDD.savePairs(sc.parallelize(1 to 100000, 10).map(i => (i, i)))
运行这个命令后可以通过命令行Visor查看缓存的大小是100000个元素。
6.现在要检查之前创建的状态在作业重启之后如何保持,关闭spark-shell然后重复步骤1-3,这时会再一次为partitioned缓存创建了Ignite上下文和RDD的实例,现在可以查看在RDD中有多少值大于50000的键
sharedRDD.filter(_._2 > 50000).count
因为在缓存中加入了从1到100000的连续数值,那么会得到结果50000
:
res0: Long = 50000
IgniteContext
然后又没有任何Ignite服务端节点启动时,就会发生这种情况,这时Ignite客户端会一直等待服务端节点启动或者超过集群连接超时时间后失败。当在客户端节点使用IgniteContext
时应该启动至少一个服务端节点。java.lang.ClassNotFoundException
和org.apache.ignite.logger.java.JavaLoggerFileHandler
ignite-log4j
模块加入使用的jar列表以使Ignite使用log4J作为日志记录器,或者就像15.3.安装和部署
章节中描述的那样修改Spark的默认类路径。