6868import java .util .Optional ;
6969import java .util .Properties ;
7070import java .util .stream .Collectors ;
71+ import javax .annotation .Nullable ;
7172
7273/**
7374 * Sink that can be configured to export data to a database table.
@@ -108,16 +109,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
108109 Class <? extends Driver > driverClass = DBUtils .getDriverClass (
109110 pipelineConfigurer , dbSinkConfig , ConnectionConfig .JDBC_PLUGIN_TYPE );
110111 if (driverClass != null && dbSinkConfig .canConnect ()) {
111- validateSchema (collector , driverClass , dbSinkConfig .getTableName (), inputSchema );
112+ validateSchema (collector , driverClass , dbSinkConfig .getTableName (), inputSchema , dbSinkConfig . getDBSchemaName () );
112113 }
113114 }
114115
115116 @ Override
116117 public void prepareRun (BatchSinkContext context ) {
117118 String connectionString = dbSinkConfig .getConnectionString ();
118119
119- LOG .debug ("tableName = {}; pluginType = {}; pluginName = {}; connectionString = {};" ,
120+ LOG .debug ("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};" ,
120121 dbSinkConfig .getTableName (),
122+ dbSinkConfig .getDBSchemaName (),
121123 ConnectionConfig .JDBC_PLUGIN_TYPE ,
122124 dbSinkConfig .getJdbcPluginName (),
123125 connectionString );
@@ -130,7 +132,8 @@ public void prepareRun(BatchSinkContext context) {
130132 try {
131133 if (Objects .nonNull (outputSchema )) {
132134 FailureCollector collector = context .getFailureCollector ();
133- validateSchema (collector , driverClass , dbSinkConfig .getTableName (), outputSchema );
135+ validateSchema (collector , driverClass , dbSinkConfig .getTableName (),
136+ outputSchema , dbSinkConfig .getDBSchemaName ());
134137 collector .getOrThrowException ();
135138 } else {
136139 outputSchema = inferSchema (driverClass );
@@ -148,8 +151,9 @@ public void prepareRun(BatchSinkContext context) {
148151 configAccessor .setInitQueries (dbSinkConfig .getInitQueries ());
149152 configAccessor .getConfiguration ().set (DBConfiguration .DRIVER_CLASS_PROPERTY , driverClass .getName ());
150153 configAccessor .getConfiguration ().set (DBConfiguration .URL_PROPERTY , connectionString );
151- configAccessor .getConfiguration ().set (DBConfiguration .OUTPUT_TABLE_NAME_PROPERTY ,
152- dbSinkConfig .getEscapedTableName ());
154+ String fullyQualifiedTableName = dbSinkConfig .getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
155+ : dbSinkConfig .getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
156+ configAccessor .getConfiguration ().set (DBConfiguration .OUTPUT_TABLE_NAME_PROPERTY , fullyQualifiedTableName );
153157 configAccessor .getConfiguration ().set (DBConfiguration .OUTPUT_FIELD_NAMES_PROPERTY , dbColumns );
154158 if (dbSinkConfig .getUser () != null ) {
155159 configAccessor .getConfiguration ().set (DBConfiguration .USERNAME_PROPERTY , dbSinkConfig .getUser ());
@@ -201,6 +205,8 @@ public void initialize(BatchRuntimeContext context) throws Exception {
201205
202206 private Schema inferSchema (Class <? extends Driver > driverClass ) {
203207 List <Schema .Field > inferredFields = new ArrayList <>();
208+ String fullyQualifiedTableName = dbSinkConfig .getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
209+ : dbSinkConfig .getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
204210 try {
205211 DBUtils .ensureJDBCDriverIsAvailable (driverClass , dbSinkConfig .getConnectionString (),
206212 dbSinkConfig .getJdbcPluginName ());
@@ -211,7 +217,7 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
211217 executeInitQueries (connection , dbSinkConfig .getInitQueries ());
212218
213219 try (Statement statement = connection .createStatement ();
214- ResultSet rs = statement .executeQuery ("SELECT * FROM " + dbSinkConfig . getEscapedTableName ()
220+ ResultSet rs = statement .executeQuery ("SELECT * FROM " + fullyQualifiedTableName
215221 + " WHERE 1 = 0" )) {
216222 inferredFields .addAll (getSchemaReader ().getSchemaFields (rs ));
217223 }
@@ -249,6 +255,8 @@ public void destroy() {
249255 private void setResultSetMetadata () throws Exception {
250256 List <ColumnType > columnTypes = new ArrayList <>(columns .size ());
251257 String connectionString = dbSinkConfig .getConnectionString ();
258+ String fullyQualifiedTableName = dbSinkConfig .getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
259+ : dbSinkConfig .getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
252260
253261 driverCleanup = DBUtils
254262 .ensureJDBCDriverIsAvailable (driverClass , connectionString , dbSinkConfig .getJdbcPluginName ());
@@ -261,8 +269,7 @@ private void setResultSetMetadata() throws Exception {
261269 // Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata
262270 // that can be used to construct DBRecord objects to sink to the database table.
263271 ResultSet rs = statement .executeQuery (String .format ("SELECT %s FROM %s WHERE 1 = 0" ,
264- dbColumns ,
265- dbSinkConfig .getEscapedTableName ()))
272+ dbColumns , fullyQualifiedTableName ))
266273 ) {
267274 ResultSetMetaData resultSetMetadata = rs .getMetaData ();
268275 columnTypes .addAll (getMatchedColumnTypeList (resultSetMetadata , columns ));
@@ -296,45 +303,47 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
296303 }
297304
298305 private void validateSchema (FailureCollector collector , Class <? extends Driver > jdbcDriverClass , String tableName ,
299- Schema inputSchema ) {
306+ Schema inputSchema , String dbSchemaName ) {
300307 String connectionString = dbSinkConfig .getConnectionString ();
301-
308+ String fullyQualifiedTableName = dbSinkConfig .getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
309+ : dbSinkConfig .getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
302310 try {
303311 DBUtils .ensureJDBCDriverIsAvailable (jdbcDriverClass , connectionString , dbSinkConfig .getJdbcPluginName ());
304312 } catch (IllegalAccessException | InstantiationException | SQLException e ) {
305313 collector .addFailure (String .format ("Unable to load or register JDBC driver '%s' while checking for " +
306314 "the existence of the database table '%s'." ,
307- jdbcDriverClass , tableName ), null ).withStacktrace (e .getStackTrace ());
315+ jdbcDriverClass , fullyQualifiedTableName ),
316+ null ).withStacktrace (e .getStackTrace ());
308317 throw collector .getOrThrowException ();
309318 }
310319
311320 Properties connectionProperties = new Properties ();
312321 connectionProperties .putAll (dbSinkConfig .getConnectionArguments ());
313322 try (Connection connection = DriverManager .getConnection (connectionString , connectionProperties )) {
314323 executeInitQueries (connection , dbSinkConfig .getInitQueries ());
315- try (ResultSet tables = connection .getMetaData ().getTables (null , null , tableName , null )) {
324+ try (ResultSet tables = connection .getMetaData ().getTables (null , dbSchemaName , tableName , null )) {
316325 if (!tables .next ()) {
317326 collector .addFailure (
318327 String .format ("Table '%s' does not exist." , tableName ),
319- String .format ("Ensure table '%s' is set correctly and that the connection string '%s' points " +
320- " to a valid database." , tableName , connectionString ))
328+ String .format ("Ensure table '%s' is set correctly and that the connection string '%s' " +
329+ "points to a valid database." , fullyQualifiedTableName , connectionString ))
321330 .withConfigProperty (DBSinkConfig .TABLE_NAME );
322331 return ;
323332 }
324333 }
325334 setColumnsInfo (inputSchema .getFields ());
326335 try (PreparedStatement pStmt = connection .prepareStatement (String .format ("SELECT %s FROM %s WHERE 1 = 0" ,
327336 dbColumns ,
328- dbSinkConfig . getEscapedTableName () ));
337+ fullyQualifiedTableName ));
329338 ResultSet rs = pStmt .executeQuery ()) {
330339 getFieldsValidator ().validateFields (inputSchema , rs , collector );
331340 }
332341 } catch (SQLException e ) {
333342 LOG .error ("Exception while trying to validate schema of database table {} for connection {}." ,
334- tableName , connectionString , e );
343+ fullyQualifiedTableName , connectionString , e );
335344 collector .addFailure (
336345 String .format ("Exception while trying to validate schema of database table '%s' for connection '%s' with %s" ,
337- tableName , connectionString , e .getMessage ()),
346+ fullyQualifiedTableName , connectionString , e .getMessage ()),
338347 null ).withStacktrace (e .getStackTrace ());
339348 }
340349 }
@@ -365,17 +374,28 @@ private void executeInitQueries(Connection connection, List<String> initQueries)
365374 */
366375 public abstract static class DBSinkConfig extends DBConfig implements DatabaseSinkConfig {
367376 public static final String TABLE_NAME = "tableName" ;
377+ public static final String DB_SCHEMA_NAME = "dbSchemaName" ;
368378 public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel" ;
369379
370380 @ Name (TABLE_NAME )
371381 @ Description ("Name of the database table to write to." )
372382 @ Macro
373383 public String tableName ;
374384
385+ @ Name (DB_SCHEMA_NAME )
386+ @ Description ("Name of the database schema of table." )
387+ @ Macro
388+ @ Nullable
389+ public String dbSchemaName ;
390+
375391 public String getTableName () {
376392 return tableName ;
377393 }
378394
395+ public String getDBSchemaName () {
396+ return dbSchemaName ;
397+ }
398+
379399 /**
380400 * Adds escape characters (back quotes, double quotes, etc.) to the table name for
381401 * databases with case-sensitive identifiers.
0 commit comments