Category Archives: Spark 2.1.0

Spark 2.1: New Cross Join Syntax

Cartesian products are very slow. More importantly, it could consume a lot of memory and trigger OOM. If the join type is not Inner, Spark SQL could use Broadcast Nested Loop Join even if both sides of tables are not small enough. Thus, it also could cause lots of unwanted network traffic. For avoiding users to accidentally use cartesian products, Spark 2.0 introduced a flag spark.sql.crossJoin.enabled. When a query contains a cartesian product, users need to turn on this flag whose default is false; otherwise, users can hit an error message.

To avoid turning on the session-scoped configuration flag, Spark 2.1 introduces a new syntax for cartesian product. Users can explicitly use the new crossJoin syntax. Below are the examples

Unfortunately, Spark 2.1.0 introduced a regression. Not all the cartesian products can be correctly detected. There are two typical cases:
Case 1) having non-equal predicates in join conditiions of an inner join.
Case 2) equi-join’s key columns are not sortable and both sides are not small enough for broadcasting.

Will fix it in the following fixpack and Spark 2.2.

Spark 2.1: Table Schemas Become More Static

In Spark SQL, the origins of table schemas can be classified into two groups.

Group A. Users specify the schema. There are two cases. Case 1 CREATE TABLE AS SELECT: the schema is determined by the result schema of the SELECT clause. Case 2 CREATE TABLE: users explicitly specify the schema. These two cases are applicable to both Hive tables and Data Source tables. For example,


Group B. The schemas are inferred at runtime There are also two cases. Case 1 CREATE DATA SOURCE TABLE without SCHEMA: Users do not specify the schema but the path to the file location. The created table is EXTERNAL. Case 2 CREATE HIVE TABLE without SCHEMA: Some Hive serde libraries can infer the schema and Hive metastore automatically generates and records the schema. For example,

Prior to Spark 2.1, Spark SQL does not store the inferred schema in the external catalog for the Case 1 in Group B. When users refreshing the metadata cache, accessing the table at the first time after (re-)starting Spark, Spark SQL will infer the schema and store the info in the metadata cache for improving the performance of subsequent metadata requests. However, the runtime schema inference could cause undesirable schema changes after each reboot of Spark.

Since Spark 2.1, we infer the schema and store it in the external catalog when creating the table. This becomes more consistent with Hive. The schema will not be changed, even if the metadata cache is refreshed. If the schema is changed, they need to recreate a new table using the file location. Now, the concept of table schema is very similar to the one in traditional RDBMS. 🙂

Spark 2.1: Built-in Data Source Option Names Are Case Insensitive

Below is an incomplete option list supported by the built-in data sources in Spark 2.1.

  • JDBC’s options:
    user, password, url, dbtable, driver, partitionColumn, lowerBound, upperBound, numPartitions, fetchsize, truncate, createTableOptions, batchsize and isolationLevel.
  • CSV’s options:
    path, sep, delimiter, mode, encoding, charset, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, compression, codec, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, escapeQuotes, and quoteAll.
  • JSON’s options:
    path, samplingRatio, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowNonNumericNumbers, allowBackslashEscapingAnyCharacter, compression, mode, columnNameOfCorruptRecord, dateFormat, and timestampFormat.
  • Parquet’s options:
    path, compression, and mergeSchema.
  • ORC’s options:
    path, compression, and orc.compress.
  • FileStream’s options:
    path, maxFilesPerTrigger, maxFileAge, and latestFirst.
  • Text’s options:
    path and compression
  • LibSVM’s options:
    path, vectorType and numFeatures

Prior to Spark 2.1, all of them are case sensitive. Note, some of them are NOT using lower-camel-case naming convention. If your inputs do not exactly match the names, it is highly possible that they are ignored without any message. Of course, this solution is not user friendly. Since Spark 2.1, the above option names become case insensitive, except the format Text and LibSVM. Will fix these two in Spark 2.2 🙂

Introduction to Catalog-based Partition Handling for Data Source Tables

In Spark 2.1, catalog-based partition handling is introduced, as explained in one databricks blog. This is a well known performance issue for data source tables with tens of thousands of partitions. In the initial schema discovery, the recursive file scanning in the file system could take tens of minutes, especially for cloud object storage services like Amazon S3 and Openstack’s Swift. This improvement greatly reduces the time for schema discovery of partitioned data source tables when the query only needs a small fraction of table partitions.

In Spark 2.0 and prior, different from Hive serde tables, the data source tables do not persistently store the partition-level metadata into the Catalog. The metadata are discovered and loaded to the metadata cache at the first time. Since 2.1, we persist the partition-level metadata for both Hive serde tables and data source tables. The partition-specific DDL statements can be supported for data source tables, including SHOW PARTITIONS, ADD PARTITION, DROP PARTITION, RECOVER PARTITIONS, RENAME PARTIITON and SET LOCATION.

However, after Spark 2.1 release, differences still exist between Hive serde tables and data source tables. : ) We still maintain a metadata cache for data source tables to store the file status info of the leaf files. These info are not recorded into the catalog. Having such a cache is pretty critical for repeatitive queries.

For data source tables, we have data files, local memory metadata caches, and global metadata in Catalog. The inconsistency could exist if the data files or metadata catalog are being shared and modified. To resolve the inconsistency, REFRESH TABLE can refresh the local metadata cache, and MSCK REPAIR TABLE or ALTER TABLE RECOVER PARTITIONS can refresh both local memory metadata caches and the global metadata in Catalog.

Note, this feature changes the external behaviors. Below is a typical case when you creating an external partitioned data source table in Spark 2.1.

Spark 2.1: Lessons Learned when Upgrading to Parquet 1.8.1

Parquet is the default file format of Apache Spark. Most of Spark performance measurements are using Parquet as the format of test data files. Thus, Parquet is pretty important to Spark. However, Parquet filter pushdown for string and binary columns was disabled since 1.5.2, due to a bug in Parquet 1.6.0. (PARQUET-251 Binary column statistics errors )

Since Spark 2.0, we tried to upgrade Parquet to 1.8.1 and thought we can re-enable the filter pushdown. However, we hit another Parquet bug PARQUET-686 and caused incorrect results (For details, see Spark-17213). Although Spark 2.1 eventually upgraded to Parquet 1.8.1, we are still unable to push down the string/binary filters. Actually, this is not the only bug Parquet 1.8.1 has. Fortunately, after many cross-community discussions, the Parquet community decided to alleviate our pains and provided the release 1.8.2. Recently, the vote of Parquet 1.8.2 release started. Really appreciate their helps. 🙂

Adding the external dependency is painful. We hit more bugs due to the Hive dependency. Thus, Spark 2.0 introduced the native DDL/DML processing and the native view support, but we still rely on Hive metastore for persistent metadata storage. Of course, we are still experiencing various bugs caused by Hive metastore APIs and not all of them can be resolved. In the future releases, the Spark community is trying to completely decouple the Hive support and catalog. More changes are coming.

Table Types in Spark: External or Managed?

There exist three types of non-temporary cataloged tables in Spark: EXTERNAL, MANAGED, and VIEW. VIEW is used for persistent views; EXTERNAL and MANAGED are used for tables. Like Hive, when dropping an EXTERNAL table, Spark only drops the metadata but keeps the data files intact. When dropping a MANAGED table, Spark removes both metadata and data files. You can find out the table type by the SparkSession API spark.catalog.getTable (added in Spark 2.1) or the DDL command DESC EXTENDED / DESC FORMATTED

Since starting native DDL supports (in Spark 2.0), we introduced the behavior changes. In Hive, users are allowed to specify the LOCATION for storing/locating the table data, which can be either EXTERNAL or MANAGED. However, in Spark, LOCATION is mandatory for EXTERNAL tables. When users creating a table with the specified LOCATION, the table type will be EXTERNAL even if users do not specify the EXTERNAL keyword. We do not allow users to create a MANAGED table with the users supplied LOCATION. The data files are stored in a newly created directory under the location defined by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse in the current directory that the Spark application is started. The examples can make it clear:

Above is the examples for creating Hive serde tables. When creating data source tables, we do not allow users to specify the EXTERNAL keyword at all. The table type is still determined by whether users provide the table location. However, in Spark 2.1, the LOCATION clause is not provided in the SQL syntax of creating data source tables. Users need to do it by providing the path key map in the OPTIONS clause. (TIPs: Spark 2.2 will unify the CREATE TABLE statements for both Hive serde tables and data source tables. We will explain it in a separate post)

Spark 2.1 and prior 2.x versions do not allow users to create a Hive serde table using DataFrameWriter APIs. (TIPs: this restriction will be lifted in Spark 2.2. We will introduce a new source format hive). Users can create either EXTERNAL or MANAGED tables, as shown below.

It looks pretty confusing. Fortunately, Spark 2.2 will resolves all of them. : )

Updates on Spark JDBC data source – numPartitions

Thanks to the native JDBC support by Spark SQL, users can access most database via their JDBC drivers. The tables in the JDBC-connected database can be loaded as a DataFrame or a SQL temporary view. The DataFrames can also be saved to the database by using DataFrameWriter APIs. We will use multiple posts to cover the changes on JDBC in the Spark 2.1 release. This post focuses on the semantics changes in the parameter numPartitions.

Prior to Spark 2.0, Spark has already provided two methods for parallel table loading and one method for parallel table saving.

The first JDBC reading option is to accept a list of predicate expressions, each of which is used to fetch a specific range of table rows. The predicate will be put in the WHERE clause when Spark builds a SQL statement to fetch the table. Spark builds a dedicated JDBC connection for each predicate. Thus, the number of concurrent JDBC connections depends on the number of predicates. The fetched data will be a RDD partition. Below is an example,

The second reading option is to automatically build the predicates based on the user supplied parameters: columnName, lowerBound, upperBound and numPartitions. The basic idea is to evenly cut the whole value range of columnName between lowerBound and upperBound into numPartitions parts. So far, columnName has to be a column of integral type. To achieve better performance, this selected column should be indexed and not skewed. The two alternative ways are shown as follows.

The JDBC writing method is simple. Basically, Spark uses the database dialect to build the insert statement for saving the data into the JDBC table. However, each RDD partition will be a separate JDBC connection. Although the number of RDD partitions can be controlled and adjusted by users, it could also grow up to spark.default.parallelism after some distributed shuffle operations like reduceByKey and join. Thus, the number of concurrent JDBC connections could exceed the limit of maximum JDBC connections in the target database. To resolve this issue, Spark 2.1 extends numPartitions to limit the concurrent JDBC connections in both reading and writing paths. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing. In the previous releases, users can still achieve it by manually calling coalesce(numPartitions) on the DataFrame before writing it to JDBC. For example,

BTW, the save() API for JDBC support was just introduced in Spark 2.1. Users still can use the DataFrameWriter’s jdbc(...) API to make it. : )

Introduction to Global Temp Views

Global temporary views are introduced in Spark 2.1.0 release. This feature is useful when you want to share data among different sessions and keep alive until your application ends. In Spark SQL, temporary views are session-scoped and will be automatically dropped if the session terminates.

All the global temporary views are tied to a system preserved temporary database global_temp. The database name is preserved, and thus, users are not allowed create/use/drop this database. The database name can be changed by an internal SQL configuration spark.sql.globalTempDatabase. Different from the temporary views, we always need to use the qualified name to access it. Below is an example.

Users are also allowed to insert the data to the global temporary views when they are built using the existing data source files. However, such usage is not encouraged. Thus, this post does not show the example.

A typical usage scenario for global temporary views is in the Thrift server. When spark.sql.hive.thriftServer.singleSession is set to false (default), the Thrift server will create multiple sessions. Global temporary views can be used for sharing the data.

Before the Spark 2.1, the alternative is to create a persistent view whose metadata is stored in the catalog. Since the catalog is global, the persistent views can be accessed from different sessions. To create/use persistent views, you must enable Hive support. This limit is expected to be lifted in the next release (i.e., version 2.2).

Spark 2.1.0 is released

Apache Spark 2.1.0 was released on Dec 28, 2016. This release mainly focuses on usability, stability, and polish, resolving over 1200 tickets. In this release, Spark SQL has over 580 tickets. The release note only briefly mentions the important features and fixes made in Spark SQL. In the following posts, I will attempt to provide a technical deep dive on the updates of Spark SQL in the 2.1.0 release.