[关闭]
@sasaki 2016-01-14T18:36:01.000000Z 字数 5854 阅读 3392

SparkSQL & DataFrames

BigData Spark


版本控制

  1. @Title SparkSQL & DataFrames
  2. @Version v1.0
  3. @Timestamp 2016-01-13 15:44
  4. @Author Nicholas
  5. @Mail redskirt@outlook.com

前言

SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:
- MapR的Drill
- Cloudera的Impala
- Shark
SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。
- 数据兼容方面 不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据;
- 性能优化方面 除了采取In-Memory Columnar Storage、byte-code generation等优化技术外、将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等;
- 组件扩展方面 无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。
引自:http://www.cnblogs.com/shishanyuan/p/4723604.html

一、

  1. SQL on Hadoop:Hive、Impala、Drill
  2. DataFrame
  3. DataFrame API:Scala、Python、Java、R

    DataFrame演示

    1. scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    2. sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1d5d721f
    3. [root@master resources]# pwd
    4. /usr/spark-1.3.1-bin-hadoop2.6/examples/src/main/resources
    5. [root@master resources]# cat people.json
    6. {"name":"Michael"}
    7. {"name":"Andy", "age":30}
    8. {"name":"Justin", "age":19}
    9. scala> import sqlContext.implicits._
    10. import sqlContext.implicits._
    11. scala> val df = sqlContext.read.json("/usr/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.json")
    12. scala> df.collect
    13. res2: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
    14. scala> df.show
    15. +----+-------+
    16. | age| name|
    17. +----+-------+
    18. |null|Michael|
    19. | 30| Andy|
    20. | 19| Justin|
    21. +----+-------+
    22. scala> df.printSchema
    23. root
    24. |-- age: long (nullable = true)
    25. |-- name: string (nullable = true)
    26. scala> df.select("name")
    27. res5: org.apache.spark.sql.DataFrame = [name: string]
    28. scala> df.select(df("name"), df("age")+10).show
    29. +-------+----------+
    30. | name|(age + 10)|
    31. +-------+----------+
    32. |Michael| null|
    33. | Andy| 40|
    34. | Justin| 29|
    35. +-------+----------+
    36. scala> df.select(df("name"), df("age")>20).show
    37. +-------+----------+
    38. | name|(age > 20)|
    39. +-------+----------+
    40. |Michael| null|
    41. | Andy| true|
    42. | Justin| false|
    43. +-------+----------+
    44. scala> df.groupBy("age").count.show
    45. +----+-----+
    46. | age|count|
    47. +----+-----+
    48. |null| 1|
    49. | 19| 1|
    50. | 30| 1|
    51. +----+-----+
    52. scala> df.groupBy("age").avg().show
    53. +----+--------+
    54. | age|avg(age)|
    55. +----+--------+
    56. |null| null|
    57. | 19| 19.0|
    58. | 30| 30.0|
    59. +----+--------+
    60. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    61. case class Person(name: String)
    62. var people = sc.textFile("/workspace/xx/tmp/a").map(_.split(" ")).map(p => Person(p(0)))
    63. val peopleSchema = sqlContext.createSchemaRDD(people)
    64. peopleSchema.registerTempTable("people")
    65. var df=sqlContext.sql("select * from people")

    使用SQL:
    1)加载数据
    2)定义case class
    3)将数据映射到case class并转换为DataFrame
    4)注册到表
    5)写SQL(sqlContext.sql("SELECT ... FROM ..."))

    SparkSQL演示

    1. [root@master resources]# cat people.txt
    2. Michael, 29
    3. Andy, 30
    4. Justin, 19
    5. scala> case class Person(name: String, age: Int)
    6. defined class Person
    7. scala> val people = sc.textFile("/usr/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim().toInt)).toDF()
    8. people: org.apache.spark.sql.DataFrame = [name: string, age: int]
    9. scala> people.registerTempTable("people")
    10. scala> sqlContext.sql("SELECT name FROM people").show
    11. +-------+
    12. | name|
    13. +-------+
    14. |Michael|
    15. | Andy|
    16. | Justin|
    17. +-------+
    18. scala> sqlContext.sql("SELECT name FROM people WHERE age > 20 AND name = 'Andy'").show
    19. +----+
    20. |name|
    21. +----+
    22. |Andy|
    23. +----+
    24. scala> sqlContext.sql("SELECT name FROM people").map(r => "name: " + r(0)).collect().foreach(println)
    25. name: Michael
    26. name: Andy
    27. name: Justin
    28. scala> sqlContext.sql("SELECT name FROM people").map(r => "name: " + r.getAs[String]("name")).collect().foreach(println)
    29. name: Michael
    30. name: Andy
    31. name: Justin

    SparkSQL可以跨各种数据源进行分析,只要支持DataSource。
    parquet作为默认的DataSource。

    save的四种模式

    • SaveMode.ErrorIfExists(默认)
    • SaveMode.Append
    • SaveMode.Overwrite
    • SaveMode.Ignore

    持久化DataFrame到Table
    DataFrame.saveAsTable,目前只支持从HiveContext创建出来的DataFrame。

    DataFrame与Parquet:读Parquet、存Parquet

    1. scala> people.write.parquet("people.parquet")
    2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    3. SLF4J: Defaulting to no-operation (NOP) logger implementation
    4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    5. [root@master people.parquet]# pwd
    6. /usr/spark-1.5.1-bin-hadoop2.6/people.parquet
    7. [root@master people.parquet]# ls
    8. _common_metadata _metadata part-r-00000-ab66c978-3caf-4689-a36b-bd164a1a12fe.gz.parquet part-r-00001-ab66c978-3caf-4689-a36b-bd164a1a12fe.gz.parquet _SUCCESS
    9. # 读取parquet
    10. scala> val parquetFile = sqlContext.read.parquet("people.parquet")
    11. parquetFile: org.apache.spark.sql.DataFrame = [name: string, age: int]
    12. # parquetFile也是一个DataFrame,进而也可以进行SQL操作
    13. scala> parquetFile.registerTempTable("people_")
    14. scala> sqlContext.sql("SELECT name, age FROM people_").show
    15. 16/01/14 17:10:15 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    16. 16/01/14 17:10:15 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17. +-------+---+
    18. | name|age|
    19. +-------+---+
    20. |Michael| 29|
    21. | Andy| 30|
    22. | Justin| 19|
    23. +-------+---+

    分区发现
    基于Parquet的DataFrame能自动发现分区表。

    Schema合并

    • 自动动态合并Schema
    • spark.sql.parquet.mergeSchema
    1. scala> val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
    2. df1: org.apache.spark.sql.DataFrame = [single: int, double: int]
    3. scala> val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
    4. df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]
    5. scala> df2.printSchema
    6. root
    7. |-- single: integer (nullable = false)
    8. |-- triple: integer (nullable = false)
    9. scala> df1.write.parquet("data/test_table/key=1")
    10. scala> df2.write.parquet("data/test_table/key=2")
    11. scala> val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
    12. df3: org.apache.spark.sql.DataFrame = [single: int, triple: int, double: int, key: int]
    13. scala> df3.printSchema
    14. root
    15. |-- single: integer (nullable = true)
    16. |-- triple: integer (nullable = true)
    17. |-- double: integer (nullable = true)
    18. |-- key: integer (nullable = true)

    DataFrame和SQL共享相同的优化和执行引擎。
    QQ截图20160114182412.jpg-93.9kB

    Data Visualization: Apache Zeppelin

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注