[关闭]
@w1992wishes 2018-09-25T20:48:11.000000Z 字数 4690 阅读 1874

【Spark】Spark 并行查询 Greenplum

spark


本文结构如下:

一、前言

Spark 支持通过 JDBC 连接关系型数据库,连接方式如下:

  1. // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
  2. // Loading data from a JDBC source
  3. val jdbcDF = spark.read
  4. .format("jdbc")
  5. .option("url", "jdbc:postgresql:dbserver")
  6. .option("dbtable", "schema.tablename")
  7. .option("user", "username")
  8. .option("password", "password")
  9. .load()
  10. val connectionProperties = new Properties()
  11. connectionProperties.put("user", "username")
  12. connectionProperties.put("password", "password")
  13. val jdbcDF2 = spark.read
  14. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
  15. // Specifying the custom data types of the read schema
  16. connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
  17. val jdbcDF3 = spark.read
  18. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
  19. // Saving data to a JDBC source
  20. jdbcDF.write
  21. .format("jdbc")
  22. .option("url", "jdbc:postgresql:dbserver")
  23. .option("dbtable", "schema.tablename")
  24. .option("user", "username")
  25. .option("password", "password")
  26. .save()
  27. jdbcDF2.write
  28. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
  29. // Specifying create table column data types on write
  30. jdbcDF.write
  31. .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  32. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

Spark 通过 JDBC 读取关系型数据库,默认查全表,只有一个 Task 去执行查询操作,大量数据情况下,效率是很慢的。

这时,可以通过构造多个 Task 并行连接 Greenplum 提升效率。

二、Spark SQL 几个属性介绍

如何构造多个 Task 来提升效率呢?首先想到的应该是 Spark SQL 本身的支持。

查看官网 Spark SQL 资料,定位到 JDBC To Other Databases:

在属性列表中找到了解决方法:

Property Name Meaning
url The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses
driver The class name of the JDBC driver to use to connect to this URL
partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading
numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing

更加详细参数参考:JDBC To Other Databases

主要解释三个参数:

三、Spark 并行查询

在知悉了上述信息之后,就可以通过增加 Task 数量来提升访问关系型数据的效率,大致有以下两种方法:

3.1、第一种:numPartitions,partitionColumn, lowerBound

  1. val spark = SparkSession
  2. .builder()
  3. .config("spark.sql.warehouse.dir", warehouseLocation)
  4. .appName("load data from gp test")
  5. .getOrCreate()
  6. // 开始时间
  7. val startTime = System.currentTimeMillis()
  8. val gpRDF = spark.read
  9. .format("jdbc")
  10. .option("driver", "com.pivotal.jdbc.GreenplumDriver")
  11. .option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb")
  12. .option("partitionColumn", "person_id")
  13. .option("lowerBound", lowerBound)
  14. .option("upperBound", upperBound)
  15. .option("numPartitions", numPartitions)
  16. .option("dbtable", "public.t_timing_face_person")
  17. .option("user", "gpadmin")
  18. .option("password", "gpadmin")
  19. .load()

3.2、第二种:dbtable

不用 numPartitions,partitionColumn, lowerBound, upperBound,可以通过 dbtable 构造子查询,并行执行多个查询得到多个结果 RDD,最后通过 reduce 合并成一个 RDD。

  1. val stride = Math.ceil(dataNums / numPartitions).toInt
  2. val spark = SparkSession
  3. .builder()
  4. .config("spark.sql.warehouse.dir", warehouseLocation)
  5. .appName("load data from gp")
  6. .getOrCreate()
  7. // 创建 numPartitions 个 task
  8. val registerDF = Range(0, numPartitions)
  9. .map(index => {
  10. spark
  11. .read
  12. .format("jdbc")
  13. .option("driver", "com.pivotal.jdbc.GreenplumDriver")
  14. .option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb")
  15. .option("dbtable", s"(SELECT feature FROM public.t_timing_face_person WHERE person_id > ${stride * index} AND person_id <= ${stride * (index + 1)}) AS t_tmp_${index}")
  16. .option("user", "gpadmin")
  17. .option("password", "gpadmin")
  18. .load()
  19. })
  20. .reduce((rdd1, rdd2) => rdd1.union(rdd2))

四、总结

对于上述的两种方式:

第一种有时会造成数据分布不均匀,有些 task 数据量很大,有些 task 数据量几乎为 0,这是因为 Spark 是根据指定的分区列 partitionColumn 来进行分区,如果指定的 partitionColumn 不是连续的数(分布不均匀),那么每个 task 中的数据量就会分配不均匀;

第二种自定义 sql,相对可控,当然自定义也就意味着代码要稍微复杂。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注