@rickyChen
2016-04-29T06:49:47.000000Z
字数 3816
阅读 5153
Spark
主函数的编写在 projectName/src/main/scala/.../下完成,如果按照上述步骤完成代码搭建,将在目录最后发现
MyRouteBuildMyRouteMain
这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下
package org.apache.spark.examples.streamingimport kafka.serializer.StringDecoderimport org.apache.spark.streaming._import org.apache.spark.streaming.kafka._import org.apache.spark.SparkConfobject DirectKafkaWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("...")System.exit(1)}//StreamingExamples.setStreamingLogLevels()val Array(brokers, topics) = argsval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(2))// Create direct kafka stream with brokers and topicsval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// Get the lines, split them into words, count the words and printval lines = messages.map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}}
将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。
pom.xml,为项目打包做准备pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.4.1</version></dependency><!-- scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.10.4</version></dependency>
除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:
<plugins><!-- Plugin to create a single jar that includes all dependencies --><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.4</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.0.2</version><configuration><source>1.7</source><target>1.7</target></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin></plugins>
pom.xml文件修改完成后,即可开始maven打包,操作如图:
点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package
在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
将jar包导到Spark服务器,运行Spark作业,运行操作如下
../bin/spark-submit --master yarn-client --jars ../lib/kafka_2.10-0.8.2.1.jar --class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
利用spark-submit把任务提交到Yarn集群,即可看到运行结果。