@tsing1226
2016-02-19T16:40:19.000000Z
字数 3980
阅读 924
spark
RDD(Resilient Distributed Dataset)是弹性分布式数据集,是Spark基本抽象。不可变,进行并行操作元素集合分区。
1) A list of partitions //一系列分区集合 2)A function for computing each split //在每个分区上都有一个函数去迭代/执行/计算它 3)A list of dependencies on other RDDs //一系列依赖 4)a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)//对于key-value的RDD可以指定一个RDD分区,告诉它如何分区 5) a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) //要运行的计算或执行最好在哪几个机器上运行。数据的本地性
创建RDD的两种方式:并行化集合和外部数据集
val listRdd=sc.parallelize(list,5)
每个rdd都有个分区数目listRdd.partitions
val rdd=sc.textFile("/user/grc/wc/input")
val kvRdd=rdd.flatMap
RDD支持两种类型的操作transformations、action和Persistence
Transformations是指创建一个新的数据集是来自一个已存在数据集;比如:map(func)、filter(func)、distinct()
Action对数据集处理后返回一个值到driver program或导出数据到存储数据平台;
RDD是一个懒执行的过程,如果RDD的操作是transformations类型则不会执行之前的操作,只是记录当前rdd的来历;若RDD的操作是action类型则执行之前的操作。
Persistence是内存中的数据的操作,数据的存储级别,比如:persist()、cache()。
rdd.toDebugString
val rdd=sc.textFile("/user/grc/wc/input")
rdd.foreach(println)
当spark运行在local模式下:可以在本地查看rdd的内容信息;
当spark运行在standalone模式下,可以在4040端口下Executors的Logs查看
val rdd=sc.textFile("/user/grc/wc/input") val wordRdd=rdd.flatMap(line=>line.split("\t")) val kvRdd=wordRdd.map((_,1)) val wordcountRdd=kvRdd.reduceByKey(_+_) wordcountRdd.sortByKey().collect
Array[(String, Int)] = Array((HADOOP,1), (hadoop,1), (hbase,1), (hive,1), (mapreduce,1), (yarn,1)) wordcountRdd.sortByKey(false).collect
Array[(String, Int)] = Array((yarn,1), (mapreduce,1), (hive,1), (hbase,1), (hadoop,1), (HADOOP,1))
val rdd = sc.parallelize(Array(23,67,12,90,8,67)) rdd.first Array[Int]= 23 rdd.take(1) Array[Int] = Array(23) rdd.take(2) Array[Int] = Array(23, 67) rdd.top(1) Array[Int] = Array(90) rdd.top(2) Array[Int] = Array(90, 67) //top是降序排列 rdd.takeOrdered(4) Array[Int] = Array(8, 12, 23, 67) // takeOrdered是升序排列
先分组,然后对于每组中的数据进行排序(降序),取前KEY值
aa 78 bb 98 aa 80 cc 98 aa 69 cc 87 bb 97 cc 86 aa 97 bb 78 bb 34 cc 85 bb 92 cc 72 bb 32 bb 23
val rdd=sc.textFile("/user/grc/spark/grouptokey/input")
rdd.count
Long = 16
rdd.collect
Array[String] = Array(aa 78, bb 98, aa 80, cc 98, aa 69, cc 87, bb 97, cc 86, aa 97, bb 78, bb 34, cc 85, bb 92, cc 72, bb 32, bb 23)
rdd.map(line=>line.split(" ")).collect
Array[Array[String]] = Array(Array(aa, 78), Array(bb, 98), Array(aa, 80), Array(cc, 98), Array(aa, 69), Array(cc, 87), Array(bb, 97), Array(cc, 86), Array(aa, 97), Array(bb, 78), Array(bb, 34), Array(cc, 85), Array(bb, 92), Array(cc, 72), Array(bb, 32), Array(bb, 23))
rdd.map(line=>line.split(" ")).map(x=>(x(0),x(1))).collect
res10: Array[(String, String)] = Array((aa,78), (bb,98), (aa,80), (cc,98), (aa,69), (cc,87), (bb,97), (cc,86), (aa,97), (bb,78), (bb,34), (cc,85), (bb,92), (cc,72), (bb,32), (bb,23))
rdd.map(line=>line.split(" ")).map(x=>(x(0),x(1).toInt)).collect
res12: Array[(String, Int)] = Array((aa,78), (bb,98), (aa,80), (cc,98), (aa,69), (cc,87), (bb,97), (cc,86), (aa,97), (bb,78), (bb,34), (cc,85), (bb,92), (cc,72), (bb,32), (bb,23))
rdd.map(line=>line.split(" ")).map(x=>(x(0),x(1).toInt)).groupByKey.collect
res13: Array[(String, Iterable[Int])] = Array((aa,CompactBuffer(78, 80, 69, 97)), (bb,CompactBuffer(98, 97, 78, 34, 92, 32, 23)), (cc,CompactBuffer(98, 87, 86, 85, 72)))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList)).collect
res18: Array[(String, List[Int])] = Array((aa,List(78, 80, 69, 97)), (bb,List(98, 97, 78, 34, 92, 32, 23)), (cc,List(98, 87, 86, 85, 72)))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted)).collect
res19: Array[(String, List[Int])] = Array((aa,List(69, 78, 80, 97)), (bb,List(23, 32, 34, 78, 92, 97, 98)), (cc,List(72, 85, 86, 87, 98)))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted.reverse)).collect
res20: Array[(String, List[Int])] = Array((aa,List(97, 80, 78, 69)), (bb,List(98, 97, 92, 78, 34, 32, 23)), (cc,List(98, 87, 86, 85, 72)))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
val yx = yy.toList.sorted.takeRight(3).reverse
(xx, yx)
}
).collect
res22: Array[(String, List[Int])] = Array((aa,List(97, 80, 78)), (bb,List(98, 97, 92)), (cc,List(98, 87, 86)))