[关闭]
@boothsun 2021-10-21T12:52:16.000000Z 字数 5417 阅读 971

Spark RDD学习笔记

大数据


总结自以下优秀网文:
1. Spark学习之路 (三)Spark之RDD
2. spark RDD
3. 请用通俗形象的语言解释下:Spark中的RDD到底是什么意思?
4. Spark笔记:RDD基本操作(上)

RDD的概述

什么是RDD

代码注释

RDD称为弹性分布式数据集。RDD 可以看做分布式分区数据集合 + 针对于数据集进行的一系列操作。从编程角度来看,RDD可以简单看成是一个数组。和普通数组的区别是:RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。

RDD是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的数据集合。RDD具有数据流模型的特点:自动容错、位置感知性能调度和可伸缩性。RDD允许用户在执行多个操作时显示地将操作集缓存在内存中,后续的查询能够重用操作集,这极大地提升了查询速度。

RDD的特性详解

  1. 可分区:RDD是由一组分区(Partition)组成的,分区是RDD的最基本组成单位,不同分区的数据可以分布在集群中的各个节点上。
  2. 可并行计算:对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就采用默认值。默认值就是程序锁分配到的CPU 核数。
  3. RDD之间是相互依赖的:RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  4. 分区函数(Partitioner):当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  5. 存储优先位置(preferred location)的列表:对于一个HDFS文件来说,这个列表保存的就是每个分区所在块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

如何创建RDD

Spark里的计算都是操作RDD进行的,构建RDD从数据来源角度分为两类:第一类是从内存中直接读取数据。第二类是从文件系统(HDFS 、本地文件系统)里读取数据。

从内存里构造RDD

第一类方式从内存里构造RDD,使用的方法:makeRDD和parallelize方法,如下代码所示:

  1. /* 使用makeRDD创建RDD */
  2. /* List */
  3. val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
  4. val r01 = rdd01.map { x => x * x }
  5. println(r01.collect().mkString(","))
  6. /* Array */
  7. val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
  8. val r02 = rdd02.filter { x => x < 5}
  9. println(r02.collect().mkString(","))
  10. val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
  11. val r03 = rdd03.map { x => x + 1 }
  12. println(r03.collect().mkString(","))
  13. /* Array */
  14. val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
  15. val r04 = rdd04.filter { x => x > 3 }
  16. println(r04.collect().mkString(","))

读取文件生成RDD

由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

  1. val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
  2. val r:RDD[String] = rdd.flatMap { x => x.split(",") }
  3. println(r.collect().mkString(","))

其他方式

读取数据库等等其他的操作。也可以生成RDD。
RDD可以通过其他的RDD转换而来的。

RDD 操作

RRD本质上就是一个数组。基于内存数据构造RDD时,我们使用的就是List和Array两种类型;即使是基于文件系统生成RDD的,也是将文件系统中每行数据读取到内存中形成各个数组(RDD)。

构造完RDD之后,接下来就是怎么操作RDD对象了。RDD的操作分为转化操作(transformation)和执行操作(action) ( RDD 操作 = transformation + action )。RDD之所以将操作分成这两类就是为了RDD惰性运算。当RDD执行转换操作时候,实际计算并没有被执行,只有当RDD进行执行操作时才会触发计算任务提交操作。执行相应的计算操作。区别转换操作和执行操作也非常简单,转换操作就是从一个RDD产生一个新的RDD操作,所以转换操作的结果还是RDD对象,而执行操作的结果就是真正的数据集。

个人理解 RDD = 数组 + 基于数组的操作 = 数组 + transformation + action

transformation 操作

主要作用:将已有RDD生成转换为另外的RDD。

map操作

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 1:1

  1. sc.parallelize(1 to 9, 3).map( x => x*2).collect

flatMap操作

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。1:N

  1. val b = sc.parallelize(1 to 4, 2).flatMap(x => 1 to x).collect() ;
  2. b.foreach(print)

filter 过滤操作

  1. val b2 = sc.parallelize(1 to 11).filter(x => x % 2 == 0).collect().mkString(" ")
  2. println(b2)

distinct 去重操作

  1. val b3 = sc.parallelize(List(1,2,5,3,4,0,5,2,3,1))
  2. println(b3.distinct().collect().mkString(","))

union 求并集操作

类SQL中的union操作。将源RDD和参数RDD求并集后返回一个新的RDD。

  1. // union 操作
  2. val b4 = sc.parallelize(List(1,2,3))
  3. val b5 = sc.parallelize(List(4,5,6))
  4. println(b4.union(b5).collect().mkString(","))

intersection 求交集操作

  1. // intersection 操作
  2. val b6 = sc.parallelize(List(1,2,3))
  3. val b7 = sc.parallelize(List(2,1,4))
  4. println(b6.intersection(b7).collect().mkString(","))

subtract 求差集操作

从原RDD里去掉在参数RDD里相同的元素。

  1. // subtract 操作
  2. val b8 = sc.parallelize(List(1,2,3))
  3. val b9 = sc.parallelize(List(2,1,4))
  4. println(b8.subtract(b9).collect().mkString(","))

cartesian 求笛卡尔集操作

求两个RDD的笛卡儿积。

  1. val b10 = sc.parallelize(List(1,2,3))
  2. val b11 = sc.parallelize(List(2,1,4))
  3. println(b10.cartesian(b11).collect().mkString(","))

groupByKey 操作

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD。

  1. val words = Array("A","A","B","D","B")
  2. val b12 = sc.parallelize(words).map(word => (word, 1))
  3. val s = b12.groupByKey().map(t => (t._1, t._2.sum))
  4. println(s.collect().mkString(","))

reduceByKey 操作

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

  1. val words = Array("A","A","B","D","B")
  2. val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
  3. val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
  4. println(wordCountsWithReduce.collect().mkString(","))

sortByKey 操作

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。

  1. val words2 = Array("Z","D","A","S","A")
  2. val b13 = sc.parallelize(words2).map(word => (word, 1))
  3. println( "排序前效果:" + b13.collect().mkString(",") ) ;
  4. println( "排序后效果:" + b13.sortByKey().collect().mkString(",") )

sortBy 操作

与sortByKey类似,但更灵活。第一个参数是根据什么排序,第二个是怎么排序(false倒序),第三个排序后分区数 默认与原RDD一样。

  1. val words2 = Array("Z","D","A","S","A")
  2. val b13 = sc.parallelize(words2).map(word => (word, 1))
  3. println( b13.reduceByKey(_+_).sortBy(t => t._2 , ascending = true).collect().mkString(",") )

leftOuterJoin操作

只针对key-value形式的RDD。leftOuterJoin类似于SQL中的左外关联left join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

  1. val leftOuterJoin1 = sc.parallelize(List( ("cat",2), ("book", 5) ) )
  2. val leftOuterJoin2 = sc.parallelize(List( ("cat",9), ("mouse", 4) ) )
  3. println( "leftOuterJoin操作:" + leftOuterJoin1.leftOuterJoin(leftOuterJoin2).collect.mkString( " ")) ;

rightOuterJoin操作

rightOuterJoin类似于SQL中的有外关联right join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可

  1. val rightOuterJoin1 = sc.parallelize(List( ("cat",2), ("book", 5) ) )
  2. val rightOuterJoin2 = sc.parallelize(List( ("cat",9), ("mouse", 4) ) )
  3. println( "rightOuterJoin操作:" + rightOuterJoin1.rightOuterJoin(rightOuterJoin2).collect.mkString( " ")) ;

Action操作

触发代码的运行,我们一段spark代码里面至少需要有一个action操作。

reduce操作

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

  1. println( "reduce操作:" + sc.parallelize(1 to 9, 3).reduce((x,y) => x + y) )

collect 操作

以数组形式返回RDD中的全部元素。

  1. println( "collect操作:" + sc.parallelize(1 to 9, 3).collect.mkString(",") )
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注