[关闭]
@zhangyy 2018-05-08T11:08:20.000000Z 字数 11306 阅读 214

Spark 的 Core 深入(二)

Spark的部分



一、日志清洗的优化:

1.1 日志清洗有脏数据问题

  1. hdfs dfs -mkdir /apachelog/
  2. hdfs dfs -put access_log /apachelogs
  3. hdfs dfs -ls /apachelogs

image_1btvt6m19c9p1m3g1t0n1ber1ma19.png-223.2kB

  1. 执行结果报错。

image_1btvt95ps1ku614toqji2epjdm.png-57.2kB

  1. LogAnalyzer.scala
  1. package com.ibeifeng.bigdata.spark.app.core
  2. import org.apache.spark.{SparkContext, SparkConf}
  3. /**
  4. * Created by zhangyy on 2016/7/16.
  5. */
  6. object LogAnalyzer {
  7. def main(args: Array[String]) {
  8. // step 0: SparkContext
  9. val sparkConf = new SparkConf()
  10. .setAppName("LogAnalyzer Applicaiton") // name
  11. .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
  12. // Create SparkContext
  13. val sc = new SparkContext(sparkConf)
  14. /** ================================================================== */
  15. val logFile = "/apachelogs/access_log"
  16. // step 1: input data
  17. val accessLogs = sc.textFile(logFile)
  18. // filer logs data
  19. .filter(ApacheAccessLog.isValidateLogLine) // closures
  20. /**
  21. * parse log
  22. */
  23. .map(line => ApacheAccessLog.parseLogLine(line))
  24. /**
  25. * The average, min, and max content size of responses returned from the server.
  26. */
  27. val contentSizes = accessLogs.map(log => log.contentSize)
  28. // compute
  29. val avgContentSize = contentSizes.reduce(_ + _) / contentSizes.count()
  30. val minContentSize = contentSizes.min()
  31. val maxContentSize = contentSizes.max()
  32. // println
  33. printf("Content Size Avg: %s , Min : %s , Max: %s".format(
  34. avgContentSize, minContentSize, maxContentSize
  35. ))
  36. /**
  37. * A count of response code's returned
  38. */
  39. val responseCodeToCount = accessLogs
  40. .map(log => (log.responseCode, 1))
  41. .reduceByKey(_ + _)
  42. .take(3)
  43. println(
  44. s"""Response Code Count: ${responseCodeToCount.mkString(", ")}"""
  45. )
  46. /**
  47. * All IPAddresses that have accessed this server more than N times
  48. */
  49. val ipAddresses = accessLogs
  50. .map(log => (log.ipAddress, 1))
  51. .reduceByKey( _ + _)
  52. // .filter( x => (x._2 > 10))
  53. .take(5)
  54. println(
  55. s"""IP Address : ${ipAddresses.mkString("< ", ", " ," >")}"""
  56. )
  57. /**
  58. * The top endpoints requested by count
  59. */
  60. val topEndpoints = accessLogs
  61. .map(log => (log.endPoint, 1))
  62. .reduceByKey(_ + _)
  63. .top(3)(OrderingUtils.SecondValueOrdering)
  64. // .map(tuple => (tuple._2, tuple._1))
  65. // .sortByKey(false)
  66. //.take(3)
  67. //.map(tuple => (tuple._2, tuple._1))
  68. println(
  69. s"""Top Endpoints : ${topEndpoints.mkString("[", ", ", " ]")}"""
  70. )
  71. /** ================================================================== */
  72. // Stop SparkContext
  73. sc.stop()
  74. }
  75. }
  1. ApacheAccessLog.scala
  1. package com.ibeifeng.bigdata.spark.app.core
  2. /**
  3. * Created by zhangyy on 2016/7/16.
  4. *
  5. * 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800]
  6. * "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1"
  7. * 200 1234
  8. */
  9. case class ApacheAccessLog (
  10. ipAddress: String,
  11. clientIndentd: String,
  12. userId: String,
  13. dateTime:String,
  14. method: String,
  15. endPoint: String,
  16. protocol: String,
  17. responseCode: Int,
  18. contentSize: Long)
  19. object ApacheAccessLog{
  20. // regex
  21. // 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
  22. val PARTTERN ="""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
  23. /**
  24. *
  25. * @param log
  26. * @return
  27. */
  28. def isValidateLogLine(log: String): Boolean = {
  29. // parse log
  30. val res = PARTTERN.findFirstMatchIn(log)
  31. // invalidate
  32. if (res.isEmpty) {
  33. false
  34. }else{
  35. true
  36. }
  37. }
  38. /**
  39. *
  40. * @param log
  41. * @return
  42. */
  43. def parseLogLine(log: String): ApacheAccessLog ={
  44. // parse log
  45. val res = PARTTERN.findFirstMatchIn(log)
  46. // invalidate
  47. if(res.isEmpty){
  48. throw new RuntimeException("Cannot parse log line: " + log)
  49. }
  50. // get value
  51. val m = res.get
  52. // return
  53. ApacheAccessLog( //
  54. m.group(1), //
  55. m.group(2),
  56. m.group(3),
  57. m.group(4),
  58. m.group(5),
  59. m.group(6),
  60. m.group(7),
  61. m.group(8).toInt,
  62. m.group(9).toLong)
  63. }
  64. }
  1. OrderingUtils.scala
  1. package com.ibeifeng.bigdata.spark.app.core
  2. import scala.math.Ordering
  3. /**
  4. * Created by zhangyy on 2016/7/16.
  5. */
  6. object OrderingUtils {
  7. object SecondValueOrdering extends Ordering[(String, Int)]{
  8. /**
  9. *
  10. * @param x
  11. * @param y
  12. * @return
  13. */
  14. override def compare(x: (String, Int), y: (String, Int)): Int = {
  15. x._2.compare(y._2)
  16. // x._2 compare y._2 // 1 to 10 | 1.to(10)
  17. }
  18. }
  19. }

image_1btvvv7c51m65mb1rtvll914as9.png-130.2kB

image_1btvvvk26c2o1u6ok4916u23s4m.png-223.2kB

image_1bu0000bu1iflkg0ssg7jf13bs13.png-188.2kB


二、Spark RDD

2.1:RDD的含义:

  1. RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如mapflatMapfilter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如joingroupByreduceByKey等更为方便的操作(注意,reduceByKeyaction,而非transformation),以支持常见的数据运算

2.2、RDD 在 hdfs的结构

image_1bu05qhqa12snmpji2gkf51oia9.png-280.4kB

image_1bu06g4gml0p118d1k5pqgnntsm.png-360kB

  1. val rdd = sc.textFile("/spark/rdd")
  2. rdd.partitions.length
  3. rdd.cache
  4. rdd.count
  5. 一个分区默认一个task 分区去处理
  6. 默认是两个分区去处理

image_1bu06vtqb9lrk32o64gil1m9413.png-38.5kB

image_1bu078qvl1d7r1lt611cb1pbr1a041g.png-280.5kB

2.3、RDD的五个特点对应方法

  1. 1. A list of partitions protected def getPartitions: Array[Partition])
  2. 一系列的的分片,比如说64M一片,类似于hadoop中的split
  3. 2. A function ofr computing each split :( @DeveloperApi
  4. def compute(split: Partition, context: TaskContext): Iterator[T])
  5. 在每个分片上都有一个方式去迭代/执行/计算
  6. 3. A list of dependencies on other RDD :(protected def getDependencies: Seq[Dependency[_]] = deps
  7. 一系列的依赖:RDDa 转换为RDDb,转换为 RDDc, 那么RDDc 就依赖于RDDb , RDDb 又依赖于RDDa
  8. ---
  9. wordcount 程序:
  10. ## val rdd = sc.textFile("xxxx")
  11. val wordRdd = rdd.flatMap(_.split(""))
  12. val kvRdd = wordRdd.map((_,1))
  13. val WordCountRdd = kvRdd.reduceByKey(_ + _)
  14. # wrodcountRdd.saveAsTextFile("yy")
  15. kvRdd <- wordRdd <- rdd
  16. rdd.toDebugString
  17. ---
  18. 4. Optionlly,a Partitioner for kev-values RDDs (e,g,to say that the RDDis hash-partitioned) :(/** Optionally overridden by subclasses to specify how they are partitioned. */
  19. @transient val partitioner: Option[Partitioner] = None
  20. 5. optionlly,a list of preferred location(s) to compute each split on (e,g,block location for an HDFS file)
  21. :(protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  22. 要运行的计算/执行最好在哪(几)个机器上运行,数据本地型
  23. 为什么会有那几个呢?
  24. 比如: hadoop 默认有三个位置,或者spark cache 到内存是可能同过StroageLevel 设置了多个副本,所以一个partition 可能返回多个最佳位置。

image_1bu080hh71fe7oju1uipie413oo1t.png-249.8kB

2.4、 如何创建RDD的两种方式

  1. 方式一:
  2. 并行化集合:
  3. 并行化集合
  4. List\Seq\Array
  5. SparkContext:
  6. ----
  7. def parallelize[T: ClassTag](
  8. seq: Seq[T],
  9. numSlices: Int = defaultParallelism): RDD[T]
  10. ---
  1. list 创建:
  1. val list = List("11","22","33")
  2. val listRdd = sc.parallelize(list)
  3. listRdd.count
  4. listRdd.frist
  5. listRdd.take(10)
  1. seq 创建:
  1. val seq = Sep("aa","bb","cc")
  2. val seqRdd = sc.parallelize(seq)
  3. seqRdd.count
  4. seqRdd.frist
  5. seqRdd.take(10)
  1. Array创建:
  1. val array = Array(1,2,3,4,5)
  2. val arryRdd = sc.parallelize(array)
  3. arryRdd.first
  4. arryRdd.count
  5. arryRdd.take(10)
  1. 方式二:从外部存储创建:
  2. val disFile = sc.textFile("/input")

2.5、RDD的转换过程

image_1bu09il2r131iq0cqh81aef1t4j2a.png-277.6kB

image_1bu09ndf01grkat6mopj9e1lmk2n.png-137.9kB

  1. transformation 转换
  2. actions 执行出结果
  3. persistence  基本都是cache过程

2.5.1: rdd transformation 应用

image_1bu0akll48l01ijmj1q1l4u1qq934.png-142.7kB

  1. union()合并应用
  2. val rdd1 = sc.parallelize(Array(1,2,3,4,5))
  3. val rdd2 = sc.parallelize(Array(6,7,8,9,10))
  4. val rdd = rdd1.union(rdd2)
  5. rdd.collect

image_1bu0atbl4sqd1gu31lff1gnq195k3h.png-130.9kB

image_1bu0au0bt1ltn1gsen69k4p14a43u.png-42.5kB

image_1bu0aubon1501fas1bhvs301vst4b.png-193.1kB

image_1bu0aums3t1uggpn5pvtcguo4o.png-109.1kB

  1. 对于分布式计算框架来说,性能瓶颈
  2. IO
  3. -1,磁盘IO
  4. -2,网络IO
  5. rdd1 -> rdd2
  6. Shuffle
  7. ============================================
  8. groupByKey() & reduceByKey()
  9. 在实际开发中,如果可以使用reduceByKey实现的功能,就不要使用groupBykey
  10. 使用reduceByKey有聚合功能,类似MapReduce中启用了Combiner
  11. ===============
  12. join()
  13. -1,等值链接
  14. -2,左连接
  15. 数据去重
  16. 结果数据
  17. res-pre.txt - rdd1
  18. 新数据进行处理
  19. web.tsv - 10GB - rdd2
  20. 解析里面的url
  21. 如果res-pre.txt中包含,就不放入,不包含就加入或者不包含url进行特殊处理
  22. rdd2.leftJoin(rdd1)
  1. join()应用
  1. val list =List("aa","bb","cc","dd")
  2. val rdd1 = sc.parallelize(list).map((_, 1))
  3. rdd1.collect
  4. val list2 = List("bb","cc","ee","hh")
  5. val rdd2 = sc.parallelize(list2).map((_, 1))
  6. rdd2.collect
  7. val rdd = rdd2.leftOuterJoin(rdd1)
  8. rdd.collect
  9. rdd.filter(tuple => tuple._2._2.isEmpty).collect
  1. repartition()应用:
  1. val rdd = sc.textFile("/spark/rdd")
  2. rdd.repartition(2)
  3. rdd.count

image_1bu0e8gd511mn12pmambkk151b55.png-102.6kB

2.5.2: RDD Actions 操作

image_1bu0ei1tqppa1mu910lv1949j6m62.png-127.4kB

  1. val list = List(("aa",1),("bb",4),("aa",56),("cc",0),("aa",89),("cc",34))
  2. val rdd = sc.parallelize(list)
  3. rdd.countByKey

image_1bu0er0ta1t9akh91hoghf81c3j6f.png-346.8kB

  1. wordcount 转变
  2. val rdd = sc.textFile("\input")
  3. rdd.flatMap(_.split(" ")).map((_, 1)).countByKey

image_1bu0f6cu31gk89q710071tufvko6s.png-129.9kB

  1. foreach() 应用
  1. val list = List(1,2,3,4,5)
  2. val rdd = sc.parallelize(list)
  3. rdd.foreach(line => println(line))
  1. 分组topkey
  1. aa 78
  2. bb 98
  3. aa 80
  4. cc 98
  5. aa 69
  6. cc 87
  7. bb 97
  8. cc 86
  9. aa 97
  10. bb 78
  11. bb 34
  12. cc 85
  13. bb 92
  14. cc 72
  15. bb 32
  16. bb 23
  1. val rdd = sc.textFile("/topkeytest")
  2. val topRdd = rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))
  3. topRdd.collect

image_1bu0h62cc1pne15j31s0hf8q1pd59.png-213.7kB

三:SparkContext三大功能

3.1、没有使用广播变量

image_1bu0hrm6dldg350h2i493emm.png-184.4kB

  1. SparkContext 的作用:
  2. -1,向Master(主节点,集群管理的主节点)申请资源,运行所有Executor
  3. -2,创建RDD的入口
  4. sc.textFile("") // 从外部存储系统创建
  5. sc.parxx() // 并行化,从Driver 中的集合创建
  6. -3,调度管理JOB运行
  7. DAGScheduler TaskScheduler
  8. --3.1
  9. 为每个Job构建DAG
  10. --3.2
  11. DAG图划分为Stage
  12. 按照RDD之间是否存在Shuffle
  13. 倒推(Stack
  14. --3.3
  15. 每个StageTaskSet
  16. 每个阶段中Task代码相同,仅仅处理数据不同

3.2 使用广播变量

image_1bu0iq503ufkoti19093abpar23.png-180.2kB

  1. val list = List(".", "?", "!", "#", "$")
  2. val braodCastList = sc.broadcast(list)
  3. val wordRdd = sc.textFile("")
  4. wordRdd.filter(word => {
  5. braodCastList.value.contains(word)
  6. })

3.4 spark 的 cluster mode

image_1bu4rsk311f9o39t14i41eq91ng89.png-223.1kB

3.4.1 spark的部署模式:

  1. 1.spark的默认模式是local模式
  2. spark-submint Scala_Project.jar

image_1bu4snqmue1mhafsb8111e1hb5m.png-346.1kB

image_1bu4sode4oku1od31pso15uh18c513.png-504.4kB

  1. 2. spark job 运行在客户端集群模式:
  2. spark-submit --master spark://192.168.3.1:7077 --deploy-mode cluster Scala_Project.jar

image_1bu4sqk9g1uq9ism1gsl8pv15711g.png-267.5kB

image_1bu4sto5715211gdrqaclccnj2a.png-400.8kB

image_1bu4sunkt1c7n3t019rg15jcm3o2n.png-382.3kB

image_1bu4svo1oqah19g312lgatq1j0j3k.png-376.1kB

image_1bu4t0u5p1tht1gpba564861vu241.png-224.8kB

3.5 spark 增加外部依赖jar包的方法

  1. 方式一:
  2. --jars JARS
  3. Comma-separated list of local jars to include on the driver and executor classpaths.
  4. jar包的位置一定要写决定路径。
  5. 方式二:
  6. --driver-class-path
  7. Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.
  8. 方式三:
  9. SPARK_CLASSPATH
  10. 配置此环境变量

3.5.1 企业中Spark Application提交,shell 脚本

  1. spark-app-submit.sh:
  2. #!/bin/sh
  3. ## SPARK_HOME
  4. SPARK_HOME=/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6
  5. ## SPARK CLASSPATH
  6. SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar
  7. ${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar

四:Spark on YARN

4.1 启动hadoop的YARN上面的服务

  1. cd /soft/hadoop/sbin
  2. 启动rescouremanager:
  3. ./yarn-daemon.sh start resourcemanager
  4. 启动nodemanger:
  5. ./yarn-daemon.sh start nodemanager

image_1bu52k2tk1efcvledfaib6v584e.png-219.5kB

4.2 yarn 的架构

image_1bu52o4a2j83a9b11t91kiv11134r.png-471kB

  1. YARN
  2. -1,分布式资源管理
  3. 主节点:ResouceManager
  4. 从节点:NodeManager -> 负责管理每台机器上的资源(内存和CPU Core
  5. -2,资源调度
  6. --1,容器Container
  7. AM/Task
  8. --2,对于运行在YARN上的每个应用,一个应用的管理者ApplicaitonMaster 资源申请和任务调度

4.2 Spark Application

  1. Spark Application
  2. -1,Driver Program
  3. 资源申请和任务调度
  4. -2,Executors
  5. 每一个Executor其实就是一个JVM,就是一个进程
  6. spark deploy mode client
  7. AM
  8. -- 全部都允许在Container
  9. Executor s
  10. 运行在Container中,类似于MapReduce任务中Map TaskReduce Task一样
  11. Driver -> AM -> RM

image_1bu537mnp3n31e813nkltu1au158.png-326.5kB

image_1bu53v8lv1vnqchi10t11qba1chg6s.png-371.1kB

4.3 spark on yarn 的运行

  1. spark-shell --master yarn

image_1bu53gutvsp91uk215fn11bq9gs62.png-333.4kB
image_1bu53hhoe1bh9fic1f4iuv1lti6f.png-221.9kB
image_1bu53gapd1pci15h2l1ima11fm5l.png-507kB

4.4 spark job on yarn

  1. cd jars/
  2. spark-submit --master yarn --deploy-mode cluster Scala_Project.jar

image_1bu54ct0r188q192m14tk1n4d1gd679.png-679.7kB

image_1bu54dadb31qjbq1ae91vtd17rp7m.png-213.2kB

image_1bu54e4u831r17rl1j0g109c1g8b8j.png-538.2kB

image_1bu54encu1e8q9nt8d34641lbo90.png-351.7kB

image_1bu54i59612oc11a21eob11b315fu9d.png-316.9kB

五: spark RDD 的 依赖

5.1 RDD Rependencies

image_1bu55deaq19stkj01kqslkv1r3n9q.png-550.2kB

  1. sparkwordcount
  2. ##
  3. val rdd = sc.textFile("/input")
  4. ##
  5. val wordRdd = rdd.flatMap(_.split(" "))
  6. val kvRdd = wordRdd.map((_, 1))
  7. val wordcountRdd = kvRdd.reduceByKey(_ + _)
  8. ##
  9. wordcountRdd.collect
  10. -----------------
  11. input -> rdd -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT
  12. ->
  13. wordcountRdd -> output Stage-02 -> ResultStage -> ResultTask
  1. 1. 窄依赖(narrow dependencies
  2. 1.1:子RDD的每个分区依赖于常数个父分区(即与数据规模无关)
  3. 1.2 输入输出一对一的算子,且结过RDD 的分区结构不变,主要是map,flatMap
  4. 1.3:输出一对一,单结果RDD 的分区结构发生变化,如:union,coalesce
  5. 1.4: 从输入中选择部分元素的算子,如filer,distinct,subtract,sample
  6. 2. 宽依赖(wide dependencies
  7. 2.1: RDD的每个分区依赖于所有父RDD 分区
  8. 2.2:对单个RDD 基于key进行重组和reduce,如groupByKey,reduceByKey
  9. 2.3:对两个RDD 基于key 进行join和重组,如:join
  1. 如何判断RDD之间是窄依赖还是宽依赖:
  2. RDD的每个分区数据 RDD的每个分区数据
  3. 1 -> 1
  4. 1 -> N : MapReduce Shuffle

5.2 spark 的shuffle

5.2.1 spark shuffle 的内在原理

  1. MapReduce框架中,shuffle是连接MapReduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑。

5.2.2 shuffle

  1. ShuffleMapReduce框架中的一个特定的phase,介于Map phaseReduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。
  2. 下面这幅图清晰地描述了MapReduce算法的整个流程,其中shuffle phase是介于Map phaseReduce phase之间。

image_1bu56iappu9ti2c1ug2kpck0uak.png-678.7kB

  1. 概念上shuffle就是一个沟通数据连接的桥梁,那么实际上shufflepartition)这一部分是如何实现的的呢,下面我们就以Spark为例讲一下shuffleSpark中的实现。

5.2.3 spark的shuffle

image_1bu55su6pboa1tf0ecuec9floa7.png-328.2kB

  1. 1.首先每一个Mapper会根据Reducer的数量创建出相应的bucketbucket的数量是M×RM×R,其中MMMap的个数,RRReduce的个数。
  2. 2.其次Mapper产生的结果会根据设置的partition算法填充到每个bucket中去。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中去。
  3. Reducer启动时,它会根据自己taskid和所依赖的Mapperid从远端或是本地的block manager中取得相应的bucket作为Reducer的输入进行处理。
  4. 这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。
  5. 3. Apache Spark Shuffle 过程与 Apache Hadoop Shuffle 过程有着诸多类似,一些概念可直接套用,例如,Shuffle 过程中,提供数据的一端,被称作 Map 端,Map 端每个生成数据的任务称为 Mapper,对应的,接收数据的一端,被称作 Reduce 端,Reduce 端每个拉取数据的任务称为 ReducerShuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。
  1. 那些操作会引起shuffle
  2. 1. 具有重新调整分区操作,
  3. eg: repartition,coalese
  4. 2. *ByKey eg: groupByKey,reduceByKey
  5. 3. 关联操作 eg:join,cogroup
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注