@boothsun
2021-10-21T12:52:16.000000Z
字数 5417
阅读 1001
大数据
总结自以下优秀网文:
1. Spark学习之路 (三)Spark之RDD
2. spark RDD
3. 请用通俗形象的语言解释下:Spark中的RDD到底是什么意思?
4. Spark笔记:RDD基本操作(上)
RDD称为弹性分布式数据集。RDD 可以看做分布式分区数据集合 + 针对于数据集进行的一系列操作。从编程角度来看,RDD可以简单看成是一个数组。和普通数组的区别是:RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。
RDD是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的数据集合。RDD具有数据流模型的特点:自动容错、位置感知性能调度和可伸缩性。RDD允许用户在执行多个操作时显示地将操作集缓存在内存中,后续的查询能够重用操作集,这极大地提升了查询速度。
Spark里的计算都是操作RDD进行的,构建RDD从数据来源角度分为两类:第一类是从内存中直接读取数据。第二类是从文件系统(HDFS 、本地文件系统)里读取数据。
第一类方式从内存里构造RDD,使用的方法:makeRDD和parallelize方法,如下代码所示:
/* 使用makeRDD创建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println(r01.collect().mkString(","))
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println(r02.collect().mkString(","))
val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println(r03.collect().mkString(","))
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println(r04.collect().mkString(","))
由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
val r:RDD[String] = rdd.flatMap { x => x.split(",") }
println(r.collect().mkString(","))
读取数据库等等其他的操作。也可以生成RDD。
RDD可以通过其他的RDD转换而来的。
RRD本质上就是一个数组。基于内存数据构造RDD时,我们使用的就是List和Array两种类型;即使是基于文件系统生成RDD的,也是将文件系统中每行数据读取到内存中形成各个数组(RDD)。
构造完RDD之后,接下来就是怎么操作RDD对象了。RDD的操作分为转化操作(transformation)和执行操作(action) ( RDD 操作 = transformation + action )。RDD之所以将操作分成这两类就是为了RDD惰性运算。当RDD执行转换操作时候,实际计算并没有被执行,只有当RDD进行执行操作时才会触发计算任务提交操作。执行相应的计算操作。区别转换操作和执行操作也非常简单,转换操作就是从一个RDD产生一个新的RDD操作,所以转换操作的结果还是RDD对象,而执行操作的结果就是真正的数据集。
个人理解 RDD = 数组 + 基于数组的操作 = 数组 + transformation + action
主要作用:将已有RDD生成转换为另外的RDD。
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 1:1
sc.parallelize(1 to 9, 3).map( x => x*2).collect
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。1:N
val b = sc.parallelize(1 to 4, 2).flatMap(x => 1 to x).collect() ;
b.foreach(print)
val b2 = sc.parallelize(1 to 11).filter(x => x % 2 == 0).collect().mkString(" ")
println(b2)
val b3 = sc.parallelize(List(1,2,5,3,4,0,5,2,3,1))
println(b3.distinct().collect().mkString(","))
类SQL中的union操作。将源RDD和参数RDD求并集后返回一个新的RDD。
// union 操作
val b4 = sc.parallelize(List(1,2,3))
val b5 = sc.parallelize(List(4,5,6))
println(b4.union(b5).collect().mkString(","))
// intersection 操作
val b6 = sc.parallelize(List(1,2,3))
val b7 = sc.parallelize(List(2,1,4))
println(b6.intersection(b7).collect().mkString(","))
从原RDD里去掉在参数RDD里相同的元素。
// subtract 操作
val b8 = sc.parallelize(List(1,2,3))
val b9 = sc.parallelize(List(2,1,4))
println(b8.subtract(b9).collect().mkString(","))
求两个RDD的笛卡儿积。
val b10 = sc.parallelize(List(1,2,3))
val b11 = sc.parallelize(List(2,1,4))
println(b10.cartesian(b11).collect().mkString(","))
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD。
val words = Array("A","A","B","D","B")
val b12 = sc.parallelize(words).map(word => (word, 1))
val s = b12.groupByKey().map(t => (t._1, t._2.sum))
println(s.collect().mkString(","))
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
val words = Array("A","A","B","D","B")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
println(wordCountsWithReduce.collect().mkString(","))
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
val words2 = Array("Z","D","A","S","A")
val b13 = sc.parallelize(words2).map(word => (word, 1))
println( "排序前效果:" + b13.collect().mkString(",") ) ;
println( "排序后效果:" + b13.sortByKey().collect().mkString(",") )
与sortByKey类似,但更灵活。第一个参数是根据什么排序,第二个是怎么排序(false倒序),第三个排序后分区数 默认与原RDD一样。
val words2 = Array("Z","D","A","S","A")
val b13 = sc.parallelize(words2).map(word => (word, 1))
println( b13.reduceByKey(_+_).sortBy(t => t._2 , ascending = true).collect().mkString(",") )
只针对key-value形式的RDD。leftOuterJoin类似于SQL中的左外关联left join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
val leftOuterJoin1 = sc.parallelize(List( ("cat",2), ("book", 5) ) )
val leftOuterJoin2 = sc.parallelize(List( ("cat",9), ("mouse", 4) ) )
println( "leftOuterJoin操作:" + leftOuterJoin1.leftOuterJoin(leftOuterJoin2).collect.mkString( " ")) ;
rightOuterJoin类似于SQL中的有外关联right join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可
val rightOuterJoin1 = sc.parallelize(List( ("cat",2), ("book", 5) ) )
val rightOuterJoin2 = sc.parallelize(List( ("cat",9), ("mouse", 4) ) )
println( "rightOuterJoin操作:" + rightOuterJoin1.rightOuterJoin(rightOuterJoin2).collect.mkString( " ")) ;
触发代码的运行,我们一段spark代码里面至少需要有一个action操作。
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
println( "reduce操作:" + sc.parallelize(1 to 9, 3).reduce((x,y) => x + y) )
以数组形式返回RDD中的全部元素。
println( "collect操作:" + sc.parallelize(1 to 9, 3).collect.mkString(",") )