I would like to log the JDBC statements that my Spark job executes with the help of p6spy.
Using p6spy is usually straightforward: one inserts the string :p6spy:
into the jdbc url and includes the p6spy driver classes into the classpath of the application. After that, all jdbc actions will be logged into a file.
For example if the original (MySQL) connection string was
jdbc:mysql://172.17.0.2:3306/spark_testthe connection string with logging enabled would be
jdbc:p6spy:mysql://172.17.0.2:3306/spark_test
I use this line to write a dataframe into a MySQL table
df.write.mode(SaveMode.Overwrite).jdbc("jdbc:p6spy:mysql://172.17.0.2:3306/spark_test", "test_table", prop)
with prop
containg the db user and password.
This line of code fails with the error message
Exception in thread "main" java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near '"value" INTEGER NOT NULL)' at line 1
Without the :p6spy:
part in the connection string everything works as expected.
My findings so far
The reason for the error is that Spark tries to execute the statement
CREATE TABLE test_table ("value" INTEGER NOT NULL)
which includes " around the column name. The correct character would be ` for MySQL.
Spark can handle different SQL dialects. The dialects are implemented in the package org.apache.spark.sql.jdbc. The dialect to use is chosen depending on the jdbc url of the database. Each dialect object implements the method canHandle(url : String)
. The MySQLDialect handles urls starting with jdbc:mysql
but not those starting with jdbc:p6spy:mysql
. Unfortunately Spark defaults to the NoopDialect for an unknown url type. This dialect adds the " around the column name.
Possible solution
It is possible to register new database dialects by calling JdbcDialects.registerDialect. Here one could register a new dialect that implements the canHandle
method as
override def canHandle(url: String): Boolean = url.startsWith("jdbc:p6spy:mysql")
and then delegates all other method calls to the orginal MySQL dialect.
Unfortunately the MySQLDialect object is declared as
private case object MySQLDialect extends JdbcDialect {
...
}
so my own implementation of a dialect cannot use MySQLDialect directly. An option would be to copy the code of MySQLDialect into my own dialect object (the code is not long), but I would like to avoid copying code.
Are there any other options?
I am using Spark 2.4.5