[关闭]
@tsing1226 2016-02-19T16:40:19.000000Z 字数 3980 阅读 924

spark

Spark RDD及操作

Spark RDD

RDD(Resilient Distributed Dataset)是弹性分布式数据集,是Spark基本抽象。不可变,进行并行操作元素集合分区。

五个主要特性

 1) A list of partitions //一系列分区集合
 2)A function for computing each split //在每个分区上都有一个函数去迭代/执行/计算它
 3)A list of dependencies on other RDDs   //一系列依赖
 4)a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)//对于key-value的RDD可以指定一个RDD分区,告诉它如何分区
 5) a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) //要运行的计算或执行最好在哪几个机器上运行。数据的本地性

RDD创建的两种方式及操作

创建RDD的两种方式:并行化集合和外部数据集

val listRdd=sc.parallelize(list,5)

每个rdd都有个分区数目listRdd.partitions

val rdd=sc.textFile("/user/grc/wc/input")
val kvRdd=rdd.flatMap

RDD Operations(RDD操作)

RDD支持两种类型的操作transformations、action和Persistence
Transformations是指创建一个新的数据集是来自一个已存在数据集;比如:map(func)、filter(func)、distinct()
Action对数据集处理后返回一个值到driver program或导出数据到存储数据平台;
RDD是一个懒执行的过程,如果RDD的操作是transformations类型则不会执行之前的操作,只是记录当前rdd的来历;若RDD的操作是action类型则执行之前的操作。
Persistence是内存中的数据的操作,数据的存储级别,比如:persist()、cache()。

rdd.toDebugString

val rdd=sc.textFile("/user/grc/wc/input")
rdd.foreach(println)
当spark运行在local模式下:可以在本地查看rdd的内容信息;
当spark运行在standalone模式下,可以在4040端口下Executors的Logs查看

Spark core

Spark案例分析

Sort 依据词频降序/升序(SortByKey)

val rdd=sc.textFile("/user/grc/wc/input")
val wordRdd=rdd.flatMap(line=>line.split("\t"))
val kvRdd=wordRdd.map((_,1))
val wordcountRdd=kvRdd.reduceByKey(_+_)
wordcountRdd.sortByKey().collect

升序运行结果:

Array[(String, Int)] = Array((HADOOP,1), (hadoop,1), (hbase,1), (hive,1), (mapreduce,1), (yarn,1))
wordcountRdd.sortByKey(false).collect

降序运行结果:

Array[(String, Int)] = Array((yarn,1), (mapreduce,1), (hive,1), (hbase,1), (hadoop,1), (HADOOP,1))

Top Key 前KEY指

val rdd = sc.parallelize(Array(23,67,12,90,8,67))
rdd.first
  Array[Int]= 23
rdd.take(1)
Array[Int] = Array(23)
rdd.take(2)
Array[Int] = Array(23, 67)
rdd.top(1)
Array[Int] = Array(90)
rdd.top(2)
Array[Int] = Array(90, 67)
//top是降序排列

rdd.takeOrdered(4)
Array[Int] = Array(8, 12, 23, 67)
// takeOrdered是升序排列

Group Top Key 前KEY值

先分组,然后对于每组中的数据进行排序(降序),取前KEY值
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23

val rdd=sc.textFile("/user/grc/spark/grouptokey/input")

rdd.count

Long = 16

rdd.collect

Array[String] = Array(aa 78, bb 98, aa 80, cc 98, aa 69, cc 87, bb 97, cc 86, aa 97, bb 78, bb 34, cc 85, bb 92, cc 72, bb 32, bb 23)

rdd.map(line=>line.split(" ")).collect

Array[Array[String]] = Array(Array(aa, 78), Array(bb, 98), Array(aa, 80), Array(cc, 98), Array(aa, 69), Array(cc, 87), Array(bb, 97), Array(cc, 86), Array(aa, 97), Array(bb, 78), Array(bb, 34), Array(cc, 85), Array(bb, 92), Array(cc, 72), Array(bb, 32), Array(bb, 23))

rdd.map(line=>line.split(" ")).map(x=>(x(0),x(1))).collect

res10: Array[(String, String)] = Array((aa,78), (bb,98), (aa,80), (cc,98), (aa,69), (cc,87), (bb,97), (cc,86), (aa,97), (bb,78), (bb,34), (cc,85), (bb,92), (cc,72), (bb,32), (bb,23))

rdd.map(line=>line.split(" ")).map(x=>(x(0),x(1).toInt)).collect

res12: Array[(String, Int)] = Array((aa,78), (bb,98), (aa,80), (cc,98), (aa,69), (cc,87), (bb,97), (cc,86), (aa,97), (bb,78), (bb,34), (cc,85), (bb,92), (cc,72), (bb,32), (bb,23))

rdd.map(line=>line.split(" ")).map(x=>(x(0),x(1).toInt)).groupByKey.collect

res13: Array[(String, Iterable[Int])] = Array((aa,CompactBuffer(78, 80, 69, 97)), (bb,CompactBuffer(98, 97, 78, 34, 92, 32, 23)), (cc,CompactBuffer(98, 87, 86, 85, 72)))

rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList)).collect

res18: Array[(String, List[Int])] = Array((aa,List(78, 80, 69, 97)), (bb,List(98, 97, 78, 34, 92, 32, 23)), (cc,List(98, 87, 86, 85, 72)))

rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted)).collect

res19: Array[(String, List[Int])] = Array((aa,List(69, 78, 80, 97)), (bb,List(23, 32, 34, 78, 92, 97, 98)), (cc,List(72, 85, 86, 87, 98)))

rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted.reverse)).collect

res20: Array[(String, List[Int])] = Array((aa,List(97, 80, 78, 69)), (bb,List(98, 97, 92, 78, 34, 32, 23)), (cc,List(98, 87, 86, 85, 72)))

rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => {

val xx = x._1

val yy = x._2

val yx = yy.toList.sorted.takeRight(3).reverse

(xx, yx)
}
).collect

res22: Array[(String, List[Int])] = Array((aa,List(97, 80, 78)), (bb,List(98, 97, 92)), (cc,List(98, 87, 86)))
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注