@w1992wishes
2018-09-25T20:48:11.000000Z
字数 4690
阅读 1874
spark
本文结构如下:
Spark 支持通过 JDBC 连接关系型数据库,连接方式如下:
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
Spark 通过 JDBC 读取关系型数据库,默认查全表,只有一个 Task 去执行查询操作,大量数据情况下,效率是很慢的。
这时,可以通过构造多个 Task 并行连接 Greenplum 提升效率。
如何构造多个 Task 来提升效率呢?首先想到的应该是 Spark SQL 本身的支持。
查看官网 Spark SQL 资料,定位到 JDBC To Other Databases:
在属性列表中找到了解决方法:
Property Name | Meaning |
---|---|
url | The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses |
driver | The class name of the JDBC driver to use to connect to this URL |
partitionColumn, lowerBound, upperBound | These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading |
numPartitions | The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing |
更加详细参数参考:JDBC To Other Databases
主要解释三个参数:
在知悉了上述信息之后,就可以通过增加 Task 数量来提升访问关系型数据的效率,大致有以下两种方法:
val spark = SparkSession
.builder()
.config("spark.sql.warehouse.dir", warehouseLocation)
.appName("load data from gp test")
.getOrCreate()
// 开始时间
val startTime = System.currentTimeMillis()
val gpRDF = spark.read
.format("jdbc")
.option("driver", "com.pivotal.jdbc.GreenplumDriver")
.option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb")
.option("partitionColumn", "person_id")
.option("lowerBound", lowerBound)
.option("upperBound", upperBound)
.option("numPartitions", numPartitions)
.option("dbtable", "public.t_timing_face_person")
.option("user", "gpadmin")
.option("password", "gpadmin")
.load()
不用 numPartitions,partitionColumn, lowerBound, upperBound,可以通过 dbtable 构造子查询,并行执行多个查询得到多个结果 RDD,最后通过 reduce 合并成一个 RDD。
val stride = Math.ceil(dataNums / numPartitions).toInt
val spark = SparkSession
.builder()
.config("spark.sql.warehouse.dir", warehouseLocation)
.appName("load data from gp")
.getOrCreate()
// 创建 numPartitions 个 task
val registerDF = Range(0, numPartitions)
.map(index => {
spark
.read
.format("jdbc")
.option("driver", "com.pivotal.jdbc.GreenplumDriver")
.option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb")
.option("dbtable", s"(SELECT feature FROM public.t_timing_face_person WHERE person_id > ${stride * index} AND person_id <= ${stride * (index + 1)}) AS t_tmp_${index}")
.option("user", "gpadmin")
.option("password", "gpadmin")
.load()
})
.reduce((rdd1, rdd2) => rdd1.union(rdd2))
对于上述的两种方式:
第一种有时会造成数据分布不均匀,有些 task 数据量很大,有些 task 数据量几乎为 0,这是因为 Spark 是根据指定的分区列 partitionColumn 来进行分区,如果指定的 partitionColumn 不是连续的数(分布不均匀),那么每个 task 中的数据量就会分配不均匀;
第二种自定义 sql,相对可控,当然自定义也就意味着代码要稍微复杂。