[关闭]
@zhangyy 2018-05-07T12:23:20.000000Z 字数 12951 阅读 163

Spark初识入门core (一)

Spark的部分


  • 一:spark 简介
  • 二:spark 的安装与配置
  • 三:spark 的wordcount
  • 四:spark 处理数据
  • 五:spark 的Application
  • 六: spark 日志清洗
  • 七:回顾

一:spark 简介

1.1 spark 的来源

  1. SparkUC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
  2. Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
  3. Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
  4. 尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。

1.2 spark 的生态环境

image_1b40erb9d10q31t0qqdmsqs1lksm.png-75.8kB


1.3 spark 与hadoop的 mapreduce 对比

  1. MapReduce
  2. Hive Storm Mahout Griph
  3. Spark Core
  4. Spark SQL Spark Streaming Spark ML Spark GraphX Spark R

1.4 spark 可以运行在什么地方

  1. Spark Application运行everywhere
  2. localyarnmemsosstandalonec2 .....

image_1b40f3h4j1c0m2au1qmlrk61a4s13.png-145.4kB

二 spark的安装与配置

2.1 配置好hadoop的环境安装scala-2.10.4.tgz

  1. tar -zxvf scala-2.10.4.tgz /opt/modules
  2. vim /etc/profile
  3. export JAVA_HOME=/opt/modules/jdk1.7.0_67
  4. export HADOOP_HOME=/opt/modules/hadoop-2.5.0-cdh5.3.6
  5. export SCALA_HOME=/opt/modules/scala-2.10.4
  6. export SPARK_HOME=/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6
  7. PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin

2.2 安装 spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz

  1. tar -zxvf spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz
  2. mv spark-1.6.1-bin-2.5.0-cdh5.3.6 /opt/modules
  3. cd /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/conf
  4. cp -p spark-env.sh.template spark-env.sh
  5. cp -p log4j.properties.template log4j.properties
  6. vim spark-env.sh
  7. 增加:
  8. JAVA_HOME=/opt/modules/jdk1.7.0_67
  9. SCALA_HOME=/opt/modules/scala-2.10.4
  10. HADOOP_CONF_DIR=/opt/modules/hadoop-2.5.0-cdh5.3.6/etc/hadoop

image_1b40o15ugt8v1stklft1ahfm289.png-115.2kB


2.3 spark 命令执行与调用

  1. 执行spark 命令
  2. bin/spark-shell

image_1b40oa3e217t01nuoqlp1tc01o69m.png-406.3kB

2.4 运行测试文件:

  1. hdfs dfs -mkdir /input
  2. hdfs dfs -put READ.md /input

2.4.1 执行统计

  1. scala> val rdd = sc.textFile("/input/README.md")

image_1b40qa6ll9uojo45leq41ctb2a.png-232.9kB

  1. rdd.count (统计多少行)
  2. rdd.first (统计第一行)
  3. rdd.filter(line => line.contains("Spark")).count (统计存在Spark的字符的有多少行)

image_1b40qb9vd2151l8o8kd189l4ll2n.png-458.2kB

image_1b40qbsttjgi1c4ng2st31b34.png-118kB

  1. scala> rdd.map(line => line.split(" ").size).reduce(_ + _)

image_1b40qqkcf88v1rpvlks86q1kbv3h.png-240.3kB

三: spark 的wordcount统计

3.1 spark 的wc统计

  1. val rdd=sc.textFile("/input") ####rdd 读文件
  2. rdd.collect ###rdd 显示文件的内容
  3. rdd.count ####rdd 显示有多少行数据

image_1b40roaqd1llq196lj4p1r8mfnk9.png-223.2kB
image_1b40rp3pi6ck12pi10k516bh1u3lm.png-908.7kB

3.2 spark 处理数据三步骤

  1. input
  2. scala> val rdd =sc.textFile("/input") ####(输入数据)
  3. process
  4. val WordCountRDD = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(( a , b ) => ( a + b )) ######(处理数据)
  5. 简写:
  6. val WordCountRDD = rdd.flatMap(_.split(" ")).map(_,1)).reduceByKey(_ + _)
  7. output
  8. scala> WordCountRDD.saveAsTextFile("/output3")
  9. scala> WordCountRDD.collect

image_1b40sv5d9e1615l01jkv1chf1qbn13.png-223.5kB

image_1b40t01h51m2sb0l1qkb7285rn1g.png-77.2kB

image_1b40t1e44122419bra65141cj4d2a.png-800.8kB

image_1b40t3hfg1iln174g1nd411fo17c92n.png-133.7kB

image_1b40t3vonno21ipr1lfap7319j334.png-78.8kB

image_1b40t8vos2md1f7717l7k18136l3h.png-827.2kB

四、spark 处理数据:

4.1 spark的数据统计

  1. spark 处理pageview 数据:
  2. hdfs dfs -mkdir /page
  3. hdfs dfs -put page_views.data /page
  4. 读取数据:
  5. val rdd = sc.textFile("/page")
  6. 处理数据:
  7. val PageRdd = rdd.map(line => line.split("\t")).map(arr => (arr(2), 1)).reduceByKey(_ + _)
  8. 取数据的前十条数据:
  9. PageRdd.take(10);

image_1btqin3s91cjefam40b1tsp114k13.png-223.3kB

image_1btqinr0eda5207qta1ga01rcb1g.png-405.2kB

image_1btqiof408m11m02pjavr713f1t.png-264.1kB

image_1btqiosfacmk38b1alm8ea1ru22a.png-264kB

  1. 将数据放入内存:
  2. rdd.cache
  3. rdd.count
  4. rdd.map(line => line.split("\t")).map(arr => (arr(2), 1)).reduceByKey(_ + _).take(10)

image_1btqj33pv1aji1cc66joo3617c934.png-110.3kB
image_1btqj7ki01r5t1q1d16751426j143h.png-622.3kB
image_1btqj89941qv9n4v11po1ifo11h3u.png-221.1kB

五:spark 的Application

5.1 spark 的运行模式

  1. spark application
  2. -1. Yarn 目前最多
  3. -2. standalone
  4. 自身分布式资源管理管理和任务调度
  5. -3 Mesos
  6. hadoop 2.x release 2.2.0 2013/10/15
  7. hadoop 2.0.x - al
  8. cloudera 2.1.x -bete
  9. cdh3.x - 0.20.2
  10. cdh4.x - 2.0.0
  11. hdfs -> HA: QJM : Federation
  12. Cloudera Manager 4.x
  13. cdh5.x

5.2 spark 的 Standalone mode

  1. Spark 本身知道的一个分布式资源管理系列以及任务调度框架
  2. 类似于 Yarn 这样的框架
  3. 分布式
  4. 主节点
  5. Master - ResourceManager
  6. 从节点:
  7. work -> nodemanager
  8. 打开 spark-env.sh
  9. 最后增加:
  10. SPARK_MASTER_IP=192.168.3.1
  11. SPARK_MASTER_PORT=7077
  12. SPARK_MASTER_WEBUI_PORT=8080
  13. SPARK_WORKER_CORES=2
  14. SPARK_WORKER_MEMORY=2g
  15. SPARK_WORKER_PORT=7078
  16. SPARK_WORKER_WEBUI_PORT=8081
  17. SPARK_WORKER_INSTANCES=1 ## 每台机器可以运行几个work
  18. cd /soft/spark/conf
  19. cp -p slaves.template slaves
  20. echo "flyfish01.yangyang.com" > slaves
  21. ------
  22. 启动spark
  23. cd /soft/spark/sbin
  24. start-slaves.sh
  25. 启动所有的从节点,也就是work节点
  26. 注意: 使用此命名,运行此命令机器,必须要配置与主节点的无密钥登录,否则启动时时候会出现一些问题,比如说输入密码之类的。
  27. ./start-master.sh
  28. ./start-slaves.sh

image_1btqlhj441a31q1i1ear3b91b0t4b.png-354.5kB
image_1btqlkhop7eplabtmt1nmq1h6c4o.png-156.3kB
image_1btqlldjn115mb6rj4t1pec1o9855.png-226.3kB

  1. job 运行在standalone 上面
  2. bin/spark-shell --master spark://192.168.3.1:7077

image_1btqlud421ntv1e7i9vobri16fq5v.png-402.7kB
image_1btqlutdp1q15ki7130dv35lu6c.png-151.6kB

5.3 standalone 上面运行

  1. 读取数据:
  2. val rdd = sc.textFile("/page")
  3. 处理数据:
  4. val PageRdd = rdd.map(line => line.split("\t")).map(arr => (arr(2), 1)).reduceByKey(_ + _)
  5. 取数据的前十条数据:
  6. PageRdd.take(10);

image_1btqm9vgb2u6hr01rjs1eqn1kbc6p.png-222.4kB
image_1btqmb3hua17tdlopvhf21rj97m.png-95.8kB
image_1btqmbec81m7a10estl31bu3fmq83.png-227.9kB
image_1btqmbpv3goc6m912mg135t14uk8g.png-199.3kB
image_1btqmcmid1o4f1tln146nii171l8t.png-233.6kB
image_1btqmdimmi7hr9okh0rhqjs49a.png-243.3kB
image_1btqme14ko7p1or7gas1gk5isf9n.png-222.3kB
image_1btqnijbp19i8lfu15728pi1t3hbe.png-159.3kB

5.4 对于一个spark application 两个部分组成

  1. - 1 Driver program -> 4040 4041 4042
  2. main 方法
  3. SparkContext -- 最最重要
  4. - 2Executor 资源
  5. 一个 jvm (进程)
  6. 运行我们的jobtask
  7. REPL: shell 交互式命令
  8. spark Application
  9. job -01
  10. count
  11. job -02
  12. stage-01
  13. task-01 (线程) -> map task (进程)
  14. task-02 (线程) -> map task (进程)
  15. 每个stage 中的所有的task,业务都是相同的,处理的数据不同
  16. stage -02
  17. job -03
  18. 从上述运行的程序来看:
  19. 如果RDD 调用的函数,返回值不是RDD的时候,就会触发一个job 进行执行
  20. 思考:
  21. reduceByKey 到底做了什么事情:
  22. -1. 分组
  23. 将相同的key value 进行合并
  24. -2.value 进行reduce
  25. 进行合并
  26. 经分析,对比mapreduce 中的worldcount 程序运行,推断出spark job stage 的划分依据RDD 之间否产生shuffle 进行划分

image_1btqmk5k0161l1qk1udq1c6t1l70a4.png-237.5kB
image_1btqn68elc5c4921dfve5ll25ah.png-213.1kB

  1. 倒序查询:
  2. val rdd = sc.textFile("/input")
  3. val WordContRdd = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
  4. val sortRdd = WordContRdd.map(tuple => (tuple._2, tuple._1)).sortByKey(false)
  5. sortRdd.collect
  6. sortRdd.take(3)
  7. sortRdd.take(3).map(tuple => (tuple._2, tuple._1))

image_1btqp481vu2g1mur1gd1td9g2hbr.png-247.2kB

image_1btqp5g401hg679vcdg524g9d8.png-98.8kB

image_1btqp6g6l1ui91pm77mb1v5n1rs7dl.png-559.1kB

image_1btqp72rh15ik3e27ln11mojthe2.png-286.8kB

image_1btqp7o251g05ksijvt1qh11v4sef.png-426.9kB

image_1btqp8bk61vbh74enlu1cbu652es.png-351.2kB

image_1btqp8vm6638545det164rrbf9.png-212.9kB

  1. scala 的隐式转换:
  2. 隐式转换:
  3. 将某个类型转换为另外一个类型。
  4. 隐式函数
  5. implicit def

5.5 在企业中开发spark的任务

  1. 如何开发spark application
  2. spark-shell + idea
  3. -1, idea 中编写代码
  4. -2,在spark-shell 中执行代码
  5. -3. 使用IDEA 将代码打包成jar包,使用bin/spark-submint 提交运行

5.6 spark 在Linux下面的idea 编程 10万条数据取前10条

  1. package com.ibeifeng.bigdata.senior.core
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. /**
  5. * Created by root on 17-11-2.
  6. *
  7. * Driver Program
  8. *
  9. */
  10. object SparkApp {
  11. def main(args: Array[String]) {
  12. // step0: sSparkContext
  13. val sparkConf = new SparkConf()
  14. .setAppName("SparkApplication")
  15. .setMaster("local[2]")
  16. // create SparkContext
  17. val sc = new SparkContext(sparkConf)
  18. //**=========================================*/
  19. //step 1: input data
  20. val rdd = sc.textFile("/page/page_views.data")
  21. //step 2: process data
  22. val pageWordRddTop10 = rdd
  23. .map(line => line.split("\t"))
  24. .map(x => (x(2),1))
  25. .reduceByKey(_ + _)
  26. .map(tuple => (tuple. _2, tuple._1))
  27. .sortByKey(false)
  28. .take(10)
  29. //Step 3 : output data
  30. pageWordRddTop10.foreach(println(_))
  31. //**=========================================*/
  32. //close spark
  33. sc.stop()
  34. }
  35. }

image_1btt7cn2ov6v1oqj1i3h12s51jlp9.png-176kB

5.7 将代码打包成一个jar包运行

image_1btt7p7e0t2hok4135n1oha1k619.png-262.8kB

image_1btt7rpep1niaa17rin116rn73m.png-304.6kB

image_1btt7tvej12u9kuofa5i5nmbs13.png-342kB

image_1btt82ao11a0saie1thh108ve8d1g.png-406.7kB

image_1btt84pt74j514mr991ng41h3m1t.png-354kB

image_1btt85jdd1hb91nmass8m19dr72a.png-360.8kB

image_1btt894q2kn71k0t1dth1kts1mtu2n.png-540.4kB

image_1btt8b81a1n336so19vhnci15at34.png-271.4kB

image_1btt8c7n2vgmrik10m82avu741.png-170.1kB

image_1btt8dd7ov6g36k17bepp8ue4u.png-171.5kB

image_1btt8ef5jg5i10ao1dcr1l4surc5b.png-109.4kB

5.8 spark 提交任务

5.8.1 运行在local

  1. bin/spark-submint Scala_Project.jar

image_1btt92m7bu6c1hs916k611n6iu55o.png-271kB

image_1btt93bqe1kvg1dnpddq9231tch65.png-320.8kB

5.8.2 运行在standalone

image_1btt998qio8vngo6882qj15kd6i.png-537.9kB

image_1btt9c12sgevk9d11v312uue9k6v.png-254.9kB

image_1btt9ck8mg0ergftghvsk1c407c.png-106.4kB

  1. 启动spark standalone
  2. bin/start-master.sh
  3. bin/start-slave2.sh

image_1btt9fo8f10jc1v7j135okol15ts7p.png-197.8kB

image_1btt9iaav1t8rftsl5413je197ra6.png-312.2kB

  1. bin/spark-submit --master spark://192.168.3.1:7077 Scala_Project.jar

image_1btt9mub7cgg2et1e5r1irt5nmaj.png-554.6kB

image_1btt9nkq75uh15m014rqaii2htb0.png-226.9kB

image_1btt9o8ms1d7tmii1crlcov18lhbd.png-358.8kB

5.7 spark 的historyserver配置

  1. spark 监控运行完成的spark application
  2. 分为两个部分:
  3. 第一: 设置sparkApplication 在运行时,需要记录日志信息
  4. 第二: 启动historyserver 通过界面查看
  5. ------
  6. 配置historyserver
  7. cd /soft/spark/conf
  8. cp -p spark-defaults.conf.template spark-defaults.conf
  9. vim defaults.conf
  10. spark.master spark://192.168.3.1:7077
  11. spark.eventLog.enabled true
  12. spark.eventLog.dir hdfs://192.168.3.1:8020/SparkJobLogs
  13. spark.eventLog.compress true
  1. 启动spark-shell
  2. bin/spark-shell

image_1bttakmgv17b0ig01tqb7o3qaibq.png-397kB

image_1bttamgcdposngoo8suvb16thd7.png-396.3kB

image_1bttaofol1mi71b1qnr7lt416lmdk.png-150.2kB

  1. bin/spark-submit --master spark://192.168.3.1:7077 Scala_Project.jar

image_1bttbbcutkflom1irq1pcm1d5e1.png-264.7kB

image_1bttbc1s110191g8h17u1ji21bdqee.png-80.5kB

image_1bttbco9e1o5v60hep3d91v73er.png-263.1kB

image_1bttbdbapgiq8i51af6d5f1avff8.png-305.7kB
image_1bttbec3t5cn1eac1co51gsd324fl.png-227.2kB

  1. 配置spark的服务端historyserver
  2. vim spark-env.sh
  3. SPARK_MASTER_IP=192.168.3.1
  4. SPARK_MASTER_PORT=7077
  5. SPARK_MASTER_WEBUI_PORT=8080
  6. SPARK_WORKER_CORES=2
  7. SPARK_WORKER_MEMORY=2g
  8. SPARK_WORKER_PORT=7078
  9. SPARK_WORKER_WEBUI_PORT=8081
  10. SPARK_WORKER_INSTANCES=1 ## 每台机器可以运行几个work
  11. ----
  12. #增加
  13. SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://flyfish01.yangyang.com:8020/SparkJobLogs -Dspark.history.fs.cleaner.enabled=true"
  14. -------------
  15. #启动historyserver
  16. cd /soft/spark
  17. sbin/start-history-server.sh

image_1bttc313v58p9so168t2e83gggi.png-103.6kB

image_1bttcac0b1jn51to41eogiq31n20gv.png-208.5kB

image_1bttcckkcafj1dhl1l2fj406b9hc.png-373.2kB

image_1bttcehdv1dea14qrp8t127v1ueghp.png-367.4kB


六: spark 的日志分析

  1. 需求一:
  2. The average, min, and max content size of responses returned from the server.
  3. ContentSize
  4. 需求二:
  5. A count of response code's returned.
  6. responseCode
  7. 需求三:
  8. All IPAddresses that have accessed this server more than N times.
  9. ipAddresses
  10. 需求四:
  11. The top endpoints requested by count.
  12. endPoint

6.1 maven 创建工程:

6.1.1 使用命令行创建

  1. mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scala-tools.org/repo-releases -DgroupId=com.ibeifeng.bigdata.spark.app -DartifactId=log-analyzer -Dversion=1.0

6.1.2 导入工程

image_1bttptu0g1mip1906188j1mtn1pavme.png-67.3kB

image_1bttpumtj6151ofu15e8igs1a9smr.png-151.8kB

image_1bttpveo11qng67s1p7a19eo1m1nn8.png-81kB

image_1bttq017jfv6ne83c44516mtnl.png-174kB

image_1bttq0hb162d145a1sh8qraphbo2.png-75.3kB

image_1bttq12k21mom9um1nhpov01begof.png-251.4kB

image_1bttq22lq9pd9pgb71jo91cldos.png-73.5kB

image_1bttq6ti46o2qcj1n851ecuu90p9.png-195.6kB

6.1.3 pom.xml 文件:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <groupId>com.ibeifeng.bigdata.spark.app</groupId>
  4. <artifactId>log-analyzer</artifactId>
  5. <version>1.0</version>
  6. <name>${project.artifactId}</name>
  7. <description>My wonderfull scala app</description>
  8. <inceptionYear>2010</inceptionYear>
  9. <properties>
  10. <encoding>UTF-8</encoding>
  11. <hadoop.version>2.5.0</hadoop.version>
  12. <spark.version>1.6.1</spark.version>
  13. </properties>
  14. <dependencies>
  15. <!-- HDFS Client -->
  16. <dependency>
  17. <groupId>org.apache.hadoop</groupId>
  18. <artifactId>hadoop-client</artifactId>
  19. <version>${hadoop.version}</version>
  20. <scope>compile</scope>
  21. </dependency>
  22. <!-- Spark Core -->
  23. <dependency>
  24. <groupId>org.apache.spark</groupId>
  25. <artifactId>spark-core_2.10</artifactId>
  26. <version>${spark.version}</version>
  27. <scope>compile</scope>
  28. </dependency>
  29. <!-- Test -->
  30. <dependency>
  31. <groupId>junit</groupId>
  32. <artifactId>junit</artifactId>
  33. <version>4.8.1</version>
  34. <scope>test</scope>
  35. </dependency>
  36. </dependencies>
  37. <build>
  38. <sourceDirectory>src/main/scala</sourceDirectory>
  39. <testSourceDirectory>src/test/scala</testSourceDirectory>
  40. <plugins>
  41. <plugin>
  42. <groupId>org.scala-tools</groupId>
  43. <artifactId>maven-scala-plugin</artifactId>
  44. <version>2.15.0</version>
  45. <executions>
  46. <execution>
  47. <goals>
  48. <goal>compile</goal>
  49. <goal>testCompile</goal>
  50. </goals>
  51. <configuration>
  52. <args>
  53. <arg>-make:transitive</arg>
  54. <arg>-dependencyfile</arg>
  55. <arg>${project.build.directory}/.scala_dependencies</arg>
  56. </args>
  57. </configuration>
  58. </execution>
  59. </executions>
  60. </plugin>
  61. <plugin>
  62. <groupId>org.apache.maven.plugins</groupId>
  63. <artifactId>maven-surefire-plugin</artifactId>
  64. <version>2.6</version>
  65. <configuration>
  66. <useFile>false</useFile>
  67. <disableXmlReport>true</disableXmlReport>
  68. <!-- If you have classpath issue like NoDefClassError,... -->
  69. <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
  70. <includes>
  71. <include>**/*Test.*</include>
  72. <include>**/*Suite.*</include>
  73. </includes>
  74. </configuration>
  75. </plugin>
  76. </plugins>
  77. </build>
  78. </project>

6.1.4 增加scala的jar包

image_1bttqb9bu1vp01dv9ie110s1conpm.png-353.7kB

6.1.5 创建LogAnalyzer.scala

  1. package com.ibeifeng.bigdata.spark.app.core
  2. import org.apache.spark.{SparkContext, SparkConf}
  3. /**
  4. * Created by zhangyy on 2016/7/16.
  5. */
  6. object LogAnalyzer {
  7. def main(args: Array[String]) {
  8. // step 0: SparkContext
  9. val sparkConf = new SparkConf()
  10. .setAppName("LogAnalyzer Applicaiton") // name
  11. .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
  12. // Create SparkContext
  13. val sc = new SparkContext(sparkConf)
  14. /** ================================================================== */
  15. val logFile = "/logs/apache.access.log"
  16. // step 1: input data
  17. val accessLogs = sc.textFile(logFile)
  18. /**
  19. * parse log
  20. */
  21. .map(line => ApacheAccessLog.parseLogLine(line))
  22. /**
  23. * The average, min, and max content size of responses returned from the server.
  24. */
  25. val contentSizes = accessLogs.map(log => log.contentSize)
  26. // compute
  27. val avgContentSize = contentSizes.reduce(_ + _) / contentSizes.count()
  28. val minContentSize = contentSizes.min()
  29. val maxContentSize = contentSizes.max()
  30. // println
  31. printf("Content Size Avg: %s , Min : %s , Max: %s".format(
  32. avgContentSize, minContentSize, maxContentSize
  33. ))
  34. /**
  35. * A count of response code's returned
  36. */
  37. val responseCodeToCount = accessLogs
  38. .map(log => (log.responseCode, 1))
  39. .reduceByKey(_ + _)
  40. .take(3)
  41. println(
  42. s"""Response Code Count: ${responseCodeToCount.mkString(", ")}"""
  43. )
  44. /**
  45. * All IPAddresses that have accessed this server more than N times
  46. */
  47. val ipAddresses = accessLogs
  48. .map(log => (log.ipAddress, 1))
  49. .reduceByKey( _ + _)
  50. // .filter( x => (x._2 > 10))
  51. .take(5)
  52. println(
  53. s"""IP Address : ${ipAddresses.mkString("< ", ", " ," >")}"""
  54. )
  55. /**
  56. * The top endpoints requested by count
  57. */
  58. val topEndpoints = accessLogs
  59. .map(log => (log.endPoint, 1))
  60. .reduceByKey(_ + _)
  61. .map(tuple => (tuple._2, tuple._1))
  62. .sortByKey(false)
  63. .take(3)
  64. .map(tuple => (tuple._2, tuple._1))
  65. println(
  66. s"""Top Endpoints : ${topEndpoints.mkString("[", ", ", " ]")}"""
  67. )
  68. /** ================================================================== */
  69. // Stop SparkContext
  70. sc.stop()
  71. }
  72. }

6.1.5 创建匹配日志匹配文件:

  1. package com.ibeifeng.bigdata.spark.app.core
  2. /**
  3. * Created by zhangyy on 2016/7/16.
  4. *
  5. * 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800]
  6. * "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1"
  7. * 200 1234
  8. */
  9. case class ApacheAccessLog (
  10. ipAddress: String,
  11. clientIndentd: String,
  12. userId: String,
  13. dateTime:String,
  14. method: String,
  15. endPoint: String,
  16. protocol: String,
  17. responseCode: Int,
  18. contentSize: Long)
  19. object ApacheAccessLog{
  20. // regex
  21. // 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
  22. val PARTTERN ="""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
  23. /**
  24. *
  25. * @param log
  26. * @return
  27. */
  28. def parseLogLine(log: String): ApacheAccessLog ={
  29. // parse log
  30. val res = PARTTERN.findFirstMatchIn(log)
  31. // invalidate
  32. if(res.isEmpty){
  33. throw new RuntimeException("Cannot parse log line: " + log)
  34. }
  35. // get value
  36. val m = res.get
  37. // return
  38. ApacheAccessLog( //
  39. m.group(1), //
  40. m.group(2),
  41. m.group(3),
  42. m.group(4),
  43. m.group(5),
  44. m.group(6),
  45. m.group(7),
  46. m.group(8).toInt,
  47. m.group(9).toLong)
  48. }
  49. }

6.1.6 报错

  1. Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
  2. at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
  3. at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
  4. at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
  5. -----
  6. 删掉 javax.servlet-xxxx.api 的maven依赖包

6.1.7 输出:

image_1btu0d0qtcov1b241sdl1jsn1efqj.png-113.3kB

image_1btu0e2t6jr82nnlfq15mt17oarg.png-91.6kB

image_1btu0er155qe17bs1o0u1sl417kert.png-127.1kB

image_1btu0g7ff1c2p10mb1pdt1jsu1rs5sq.png-99.6kB

七:回顾

  1. 回顾:
  2. -1,了解认识Spark
  3. MapReduce比较
  4. “四大优势”
  5. --1,速度快
  6. --2,使用简单
  7. --3,一栈式
  8. --4,无处不在的运行
  9. 开发测试
  10. SCALA: REPL/Python
  11. -2,Spark Core
  12. 两大抽象概念
  13. --1,RDD
  14. 集合,存储不同类型的数据 - List
  15. ---1,内存
  16. memory
  17. ---2,分区
  18. hdfs block
  19. ---3,对每个分区上数据进行操作
  20. function
  21. --2,共享变量shared variables
  22. ---1,广播变量
  23. ---2,累加器
  24. 计数器
  25. -3,环境与开发
  26. --1,Local Mode
  27. spark-shell
  28. --2,Spark Standalone
  29. 配置
  30. 启动
  31. 监控
  32. 使用
  33. --3,HistoryServer
  34. -1,针对每个应用是否记录eventlog
  35. -2,HistoryServer进行展示
  36. --4,如何使用IDE开发Spark Application
  37. -1,SCALA PROJECt
  38. 如何添加Spark JAR
  39. -2,MAVEN PROJECT
  40. =================================================
  41. Spark 开发
  42. step 1:
  43. input data -> rdd/dataframe
  44. step 2:
  45. process data -> rdd##xx() / df#xx | "select xx, * from xx ..."
  46. step 3:
  47. output data -> rdd.saveXxxx / df.write.jdbc/json/xxx
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注