Thanks to the native JDBC support by Spark SQL, users can access most database via their JDBC drivers. The tables in the JDBC-connected database can be loaded as a DataFrame or a SQL temporary view. The DataFrames can also be saved to the database by using DataFrameWriter APIs. We will use multiple posts to cover the changes on JDBC in the Spark 2.1 release. This post focuses on the semantics changes in the parameter numPartitions
.
Prior to Spark 2.0, Spark has already provided two methods for parallel table loading and one method for parallel table saving.
The first JDBC reading option is to accept a list of predicate expressions, each of which is used to fetch a specific range of table rows. The predicate will be put in the WHERE clause when Spark builds a SQL statement to fetch the table. Spark builds a dedicated JDBC connection for each predicate. Thus, the number of concurrent JDBC connections depends on the number of predicates. The fetched data will be a RDD partition. Below is an example,
|
val parts = Array( "THEID < 2", "THEID >= 2 && KEYCOL2 > 'a'", "THEID >= 2 && KEYCOL2 <= 'a'") val df = spark.read.jdbc( url = urlWithUserAndPass, table = "TEST.PEOPLE", predicates = parts, connectionProperties = new Properties()) df.show() |
The second reading option is to automatically build the predicates based on the user supplied parameters: columnName
, lowerBound
, upperBound
and numPartitions. The basic idea is to evenly cut the whole value range of columnName
between lowerBound
and upperBound
into numPartitions
parts. So far, columnName has to be a column of integral type. To achieve better performance, this selected column should be indexed and not skewed. The two alternative ways are shown as follows.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
// using the DataFrameReader’s JDBC API val df = spark.read.jdbc( url = urlWithUserAndPass, table = "TEST.seq", columnName = "id", lowerBound = 100000, upperBound = 400000, numPartitions = 10, connectionProperties = new Properties()) df.show() // using the temporary view and pass the JDBC parameters as options sql( s""" |CREATE TEMPORARY TABLE jdbcTable1 |USING org.apache.spark.sql.jdbc |OPTIONS (url '$url', dbtable 'TEST.seq', user 'testUser', password 'testPass', |partitionColumn '"id"', lowerBound '100000', upperBound '400000', numPartitions '10') """.stripMargin) sql("select * from jdbcTable1").show() |
The JDBC writing method is simple. Basically, Spark uses the database dialect to build the insert statement for saving the data into the JDBC table. However, each RDD partition will be a separate JDBC connection. Although the number of RDD partitions can be controlled and adjusted by users, it could also grow up to spark.default.parallelism
after some distributed shuffle operations like reduceByKey
and join
. Thus, the number of concurrent JDBC connections could exceed the limit of maximum JDBC connections in the target database. To resolve this issue, Spark 2.1 extends numPartitions to limit the concurrent JDBC connections in both reading and writing paths. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions)
before writing. In the previous releases, users can still achieve it by manually calling coalesce(numPartitions)
on the DataFrame before writing it to JDBC. For example,
|
df.write.format("jdbc") .option("dbtable", "TEST.SAVETEST") .option("url", url1) .option("user", "testUser") .option("password", "testPass") .option("numPartitions", "5") .save() |
BTW, the save()
API for JDBC support was just introduced in Spark 2.1. Users still can use the DataFrameWriter’s jdbc(...)
API to make it. : )