[关闭]
@liyuj 2017-04-13T22:16:12.000000Z 字数 7803 阅读 3368

Apache-Ignite-1.9.0-中文开发手册

4.Ignite与Spark

4.1.Ignite与Spark

Ignite提供了一个Spark RDD抽象的实现,他可以容易地在内存中跨越多个Spark作业共享状态,原生SparkRDD和IgniteRDD的主要不同在于IgniteRDD提供了一个跨越不同Spark作业、workers或者应用的数据的共享内存视图,而原生的SparkRDD无法被其他的Spark作业或者应用看到。
IgniteRDD实现的方式是作为一个分布式的Ignite缓存的视图,他可以部署在Spark执行进程内部,或者Spark workder上或者他自己的集群上。这意味着根据选择的不同的部署模型,共享状态可能只存在于一个Spark应用的生命周期内(嵌入式模式),或者可能存在于Spark应用外部(独立模式),这时状态可以在多个Spark应用之间共享。

4.2.IgniteContext和IgniteRDD

4.2.1.IgniteContext

IgniteContext是Spark和Ignite集成的主要入口点。要创建一个Ignite上下文的实例,必须提供一个SparkContext的实例以及创建IgniteConfiguration的闭包(配置工厂)。Ignite上下文会确保Ignite服务端或者客户端节点存在于所有参与的作业实例中。或者,一个XML配置文件的路径也可以传入IgniteContext构造器,它会用于配置启动的节点。
当创建一个IgniteContext实例时,一个可选的booleanclient参数(默认为true)可以传入上下文构造器,这个通常用于一个共享部署安装,当client设为false时,上下文会操作于嵌入式模式然后在上下文创建期间在所有的worker上启动服务端节点。可以参照15.3.安装与配置章节了解有关部署配置的信息。
一旦创建了IgniteContextIgniteRDD的实例可以通过fromCache方法获得,当RDD创建之后请求的缓存在Ignite集群中是否存在不是必要的,如果指定名字的缓存不存在,会用提供的配置或者模板配置创建它。
比如,下面的代码会用默认的Ignite配置创建一个Ignite上下文:

  1. val igniteContext = new IgniteContext(sparkContext,
  2. () => new IgniteConfiguration())

下面的代码会从example-cache.xml的配置创建一个Ignite上下文:

  1. val igniteContext = new IgniteContext(sparkContext,
  2. "examples/config/example-cache.xml")

4.2.2.IgniteRDD

IgniteRDD是一个SparkRDD抽象的实现,他表示Ignite的缓存的活动视图。IgniteRDD不是一成不变的,Ignite缓存的所有改变(不论是他被另一个RDD或者缓存的外部改变触发)对于RDD用户都会立即可见。
IgniteRDD利用Ignite缓存的分区性质然后向Spark执行器提供分区信息。IgniteRDD中分区的数量会等于底层Ignite缓存的分区数量,IgniteRDD还通过getPrefferredLocations方法向Spark提供了关系信息使RDD计算可以使用本地的数据。
从Ignite中读取数据
因为IgniteRDD是Ignite缓存的一个活动视图,因此不需要从Ignite向Spark应用显式地加载数据,在IgniteRDD实例创建之后所有的RDD方法都会立即可用。
比如,假定一个名为partitioned的Ignite缓存包含字符值,下面的代码会查找包含单词Ignite的所有值:

  1. val cache = igniteContext.fromCache("partitioned")
  2. val result = cache.filter(_._2.contains("Ignite")).collect()

向Ignite保存数据
因为Ignite缓存操作于键-值对,因此向Ignite缓存保存数据的最明确的方法是使用Spark数组RDD以及savePairs方法,如果可能的话,这个方法会利用RDD分区的优势然后以并行的方式将数据存入缓存。
也可能使用saveValues方法将只有值的RDD存入Ignite缓存,这时,IgniteRDD会为每个要存入缓存的值生成一个唯一的本地关系键。
比如,下面的代码会使用10个并行存储操作保存从1到10000的整型值对到一个名为partitioned的缓存中:

  1. val cacheRdd = igniteContext.fromCache("partitioned")
  2. cacheRdd.savePairs(sparkContext.parallelize(1 to 10000, 10).map(i => (i, i)))

在Ignite缓存中执行SQL查询
当Ignite缓存配置为启用索引子系统,就可以使用objectSqlsql方法在缓存中执行SQL查询。可以参照4.6.缓存查询章节来了解有关Ignite SQL查询的更多信息。
比如,假定名为partitioned的缓存配置了索引整型对,下面的代码会获得 (10, 100)范围内的所有整型值:

  1. val cacheRdd = igniteContext.fromCache("partitioned")
  2. val result = cacheRdd.sql(
  3. "select _val from Integer where val > ? and val < ?", 10, 100)

4.2.3.示例

GitHub上有一些示例,演示了IgniteRDD如何使用:

4.3.安装和部署

4.3.1.共享部署

共享部署意味着Ignite节点的运行独立于Spark应用然后即使Spark作业结束之后也仍然保存状态。类似于Spark,将Ignite部署入集群有两种方式:
独立部署
在独立部署模式,Ignite节点应该与Spark worker节点部署在一起。Ignite安装的介绍可以参照1.2.入门章节,在所有的worker节点上安装Ignite之后,通过ignite.sh脚本在每个配置好的Spark workder上启动一个节点。
默认将Ignite库文件加入Spark类路径
Spark应用部署模型可以在应用启动期间动态地发布jar,但是这个模式有一些缺点:

有一个方法来对每一个启动的应用修改默认的Spark类路径(这个可以在每个Spark集群的机器上实现,包括master节点,worker节点以及driver节点)。

  1. 定位到$SPARK_HOME/conf/spark-env.sh文件,如果该文件不存在,用$SPARK_HOME/conf/spark-env.sh.template这个模板创建它;
  2. 将下面的行加入spark-env.sh文件的末尾(如果没有全局定义IGNITE_HOME的话,需要将设置IGNITE_HOME的行的注释去掉)。
  1. # Optionally set IGNITE_HOME here.
  2. # IGNITE_HOME=/path/to/ignite
  3. IGNITE_LIBS="${IGNITE_HOME}/libs/*"
  4. for file in ${IGNITE_HOME}/libs/*
  5. do
  6. if [ -d ${file} ] && [ "${file}" != "${IGNITE_HOME}"/libs/optional ]; then
  7. IGNITE_LIBS=${IGNITE_LIBS}:${file}/*
  8. fi
  9. done
  10. export SPARK_CLASSPATH=$IGNITE_LIBS

也可以验证Spark的类路径是否被运行bin/spark-shell所改变,然后输入一个简单的import语句:

  1. scala> import org.apache.ignite.configuration._
  2. import org.apache.ignite.configuration._

MESOS部署
Ignite可以部署在Mesos集群上,可以在2.11.Mesos部署章节参照Mesos部署说明。

4.3.2.嵌入式部署

嵌入式部署意味着Ignite节点是在Spark作业进程内部启动的,然后当作业结束时就停止了,这时不需要额外的部署步骤。Ignite代码会通过Spark的部署机制分布到workder机器然后作为IgniteContext初始化的一部分在所有的workder上启动节点。

4.3.3.Maven

Ignite的Spark构件已经上传到Maven中心库,根据使用的Scala版本,引入下面的对应的依赖:
Scala 2.11

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-spark</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

Scala 2.10

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-spark_2.10</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

4.3.4.SBT

如果在Scala应用中使用SBT作为构建工具,那么可以使用下面的一行命令,将Ignite的Spark构件加入build.sbt
Scala 2.11

  1. libraryDependencies += "org.apache.ignite" % "ignite-spark" % "ignite.version"

Scala 2.10

  1. libraryDependencies += "org.apache.ignite" % "ignite-spark_2.10" % "ignite.version"

4.4.用Spark-shell测试Ignite

4.4.1.启动集群

这里会简要地介绍Spark和Ignite集群的启动过程,可以参照Spark文档来了解详细信息。
为了测试,需要一个Spark master以及至少一个Spark workrer,通常Spark master和Spark workders是不同的机器,但是为了测试可以在启动master的同一台机器上启动worker。

  1. 下载和解压Spark二进制发行版到所有节点的同一个位置(将其设为SPARK_HOME);
  2. 下载和解压Ignite二进制发行版到所有节点的同一个位置(将其设为IGNITE_HOME);
  3. 转到$SPARK_HOME然后执行如下的命令:
  1. sbin/start-master.sh

这个脚本会输出启动过程的日志文件的路径,可以在日志文件中查看master的URL,他的格式是:spark://master_host:master_port。也可以在日志文件中查看WebUI的URL(通常是http://master_host:8080)。
4. 转到每个worker节点的$SPARK_HOME然后执行如下的命令:

  1. bin/spark-class org.apache.spark.deploy.worker.Worker spark://master_host:master_port

这里的spark://master_host:master_port就是从上述的master的日志文件中抓取的master的URL。在所有的worker节点都启动之后可以查看master的WebUI界面,他会显示所有的处于ALIVE状态的已经注册的worker。
5. 转到每个worker节点的$IGNITE_HOME目录然后通过运行如下的命令启动一个Ignite节点:

  1. bin/ignite.sh

这时可以看到通过默认的配置Ignite节点会彼此发现对方。如果网络不允许多播通信,那么需要修改默认的配置文件然后配置TCP发现。

4.4.2.使用Spark-Shell

现在,在集群启动运行之后,可以运行spark-shell来验证这个集成:
1.启动spark-shell

  1. ./bin/spark-shell
  2. --packages org.apache.ignite:ignite-spark:1.8.0
  3. --master spark://master_host:master_port
  4. --repositories http://repo.maven.apache.org/maven2/org/apache/ignite
  1. ./bin/spark-shell --jars path/to/ignite-core.jar,path/to/ignite-spark.jar,path/to/cache-api.jar,path/to/ignite-log4j.jar,path/to/log4j.jar --master spark://master_host:master_port

这时可以看到Spark shell已经启动了。
注意,如果打算使用Spring的配置进行加载的话,那么需要同时添加ignite-spring的依赖。

  1. ./bin/spark-shell
  2. --packages org.apache.ignite:ignite-spark:1.8.0,org.apache.ignite:ignite-spring:1.8.0
  3. --master spark://master_host:master_port

2.通过默认的配置创建一个Ignite上下文的实例

  1. import org.apache.ignite.spark._
  2. import org.apache.ignite.configuration._
  3. val ic = new IgniteContext(sc, () => new IgniteConfiguration())

然后可以看到一些像下面这样的:

  1. ic: org.apache.ignite.spark.IgniteContext = org.apache.ignite.spark.IgniteContext@62be2836

创建一个IgniteContext实例的另一个方式是使用一个配置文件,注意如果指向配置文件的路径是相对形式的,那么IGNITE_HOME环境变量应该是在系统中全局设定的,因为路径的解析是相对于IGNITE_HOME的。

  1. import org.apache.ignite.spark._
  2. import org.apache.ignite.configuration._
  3. val ic = new IgniteContext(sc, "config/default-config.xml")

3.通过使用默认配置中的"partitioned"缓存创建一个IgniteRDD的实例

  1. val sharedRDD = ic.fromCache[Integer, Integer]("partitioned")

然后可以看到为partitioned缓存创建了一个RDD的实例:

  1. shareRDD: org.apache.ignite.spark.IgniteRDD[Integer,Integer] = IgniteRDD[0] at RDD at IgniteAbstractRDD.scala:27

注意RDD的创建是一个本地的操作,并不会在Ignite集群上创建缓存。
4.这时可以用RDD让Spark做一些事情,比如,获取值小于10的所有键值对

  1. sharedRDD.filter(_._2 < 10).collect()

因为缓存还没有数据,因此结果会是一个空的数组:

  1. res0: Array[(Integer, Integer)] = Array()

可以查看远程spark worker的日志文件然后可以看到Ignite上下文如何在集群内的所有远程worker上启动客户端。也可以启动命令行Visor然后查看partitioned缓存已经创建了。
5.在Ignite中保存一些值

  1. sharedRDD.savePairs(sc.parallelize(1 to 100000, 10).map(i => (i, i)))

运行这个命令后可以通过命令行Visor查看缓存的大小是100000个元素。
6.现在要检查之前创建的状态在作业重启之后如何保持,关闭spark-shell然后重复步骤1-3,这时会再一次为partitioned缓存创建了Ignite上下文和RDD的实例,现在可以查看在RDD中有多少值大于50000的键

  1. sharedRDD.filter(_._2 > 50000).count

因为在缓存中加入了从1到100000的连续数值,那么会得到结果50000

  1. res0: Long = 50000

4.5.发现并解决的问题

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注