@sasaki
2016-01-14T18:36:01.000000Z
字数 5854
阅读 3369
BigData
Spark
@Title SparkSQL & DataFrames
@Version v1.0
@Timestamp 2016-01-13 15:44
@Author Nicholas
@Mail redskirt@outlook.com
SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:
- MapR的Drill
- Cloudera的Impala
- Shark
SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。
- 数据兼容方面 不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据;
- 性能优化方面 除了采取In-Memory Columnar Storage、byte-code generation等优化技术外、将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等;
- 组件扩展方面 无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。
引自:http://www.cnblogs.com/shishanyuan/p/4723604.html
DataFrame API:Scala、Python、Java、R
DataFrame演示
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1d5d721f
[root@master resources]# pwd
/usr/spark-1.3.1-bin-hadoop2.6/examples/src/main/resources
[root@master resources]# cat people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> val df = sqlContext.read.json("/usr/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.json")
scala> df.collect
res2: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.select("name")
res5: org.apache.spark.sql.DataFrame = [name: string]
scala> df.select(df("name"), df("age")+10).show
+-------+----------+
| name|(age + 10)|
+-------+----------+
|Michael| null|
| Andy| 40|
| Justin| 29|
+-------+----------+
scala> df.select(df("name"), df("age")>20).show
+-------+----------+
| name|(age > 20)|
+-------+----------+
|Michael| null|
| Andy| true|
| Justin| false|
+-------+----------+
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
scala> df.groupBy("age").avg().show
+----+--------+
| age|avg(age)|
+----+--------+
|null| null|
| 19| 19.0|
| 30| 30.0|
+----+--------+
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class Person(name: String)
var people = sc.textFile("/workspace/xx/tmp/a").map(_.split(" ")).map(p => Person(p(0)))
val peopleSchema = sqlContext.createSchemaRDD(people)
peopleSchema.registerTempTable("people")
var df=sqlContext.sql("select * from people")
使用SQL:
1)加载数据
2)定义case class
3)将数据映射到case class并转换为DataFrame
4)注册到表
5)写SQL(sqlContext.sql("SELECT ... FROM ..."))
SparkSQL演示
[root@master resources]# cat people.txt
Michael, 29
Andy, 30
Justin, 19
scala> case class Person(name: String, age: Int)
defined class Person
scala> val people = sc.textFile("/usr/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim().toInt)).toDF()
people: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> people.registerTempTable("people")
scala> sqlContext.sql("SELECT name FROM people").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> sqlContext.sql("SELECT name FROM people WHERE age > 20 AND name = 'Andy'").show
+----+
|name|
+----+
|Andy|
+----+
scala> sqlContext.sql("SELECT name FROM people").map(r => "name: " + r(0)).collect().foreach(println)
name: Michael
name: Andy
name: Justin
scala> sqlContext.sql("SELECT name FROM people").map(r => "name: " + r.getAs[String]("name")).collect().foreach(println)
name: Michael
name: Andy
name: Justin
SparkSQL可以跨各种数据源进行分析,只要支持DataSource。
parquet作为默认的DataSource。
save的四种模式
持久化DataFrame到Table
DataFrame.saveAsTable,目前只支持从HiveContext创建出来的DataFrame。
DataFrame与Parquet:读Parquet、存Parquet
scala> people.write.parquet("people.parquet")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[root@master people.parquet]# pwd
/usr/spark-1.5.1-bin-hadoop2.6/people.parquet
[root@master people.parquet]# ls
_common_metadata _metadata part-r-00000-ab66c978-3caf-4689-a36b-bd164a1a12fe.gz.parquet part-r-00001-ab66c978-3caf-4689-a36b-bd164a1a12fe.gz.parquet _SUCCESS
# 读取parquet
scala> val parquetFile = sqlContext.read.parquet("people.parquet")
parquetFile: org.apache.spark.sql.DataFrame = [name: string, age: int]
# parquetFile也是一个DataFrame,进而也可以进行SQL操作
scala> parquetFile.registerTempTable("people_")
scala> sqlContext.sql("SELECT name, age FROM people_").show
16/01/14 17:10:15 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
16/01/14 17:10:15 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
分区发现
基于Parquet的DataFrame能自动发现分区表。
Schema合并
scala> val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
scala> val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
scala> df2.printSchema
root
|-- single: integer (nullable = false)
|-- triple: integer (nullable = false)
scala> df1.write.parquet("data/test_table/key=1")
scala> df2.write.parquet("data/test_table/key=2")
scala> val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3: org.apache.spark.sql.DataFrame = [single: int, triple: int, double: int, key: int]
scala> df3.printSchema
root
|-- single: integer (nullable = true)
|-- triple: integer (nullable = true)
|-- double: integer (nullable = true)
|-- key: integer (nullable = true)
DataFrame和SQL共享相同的优化和执行引擎。
Data Visualization: Apache Zeppelin