@@ -116,10 +116,12 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
116116 @ Override
117117 public void prepareRun (BatchSinkContext context ) {
118118 String connectionString = dbSinkConfig .getConnectionString ();
119+ String dbSchemaName = dbSinkConfig .getDBSchemaName ();
120+ String tableName = dbSinkConfig .getTableName ();
119121
120122 LOG .debug ("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};" ,
121- dbSinkConfig . getTableName () ,
122- dbSinkConfig . getDBSchemaName () ,
123+ tableName ,
124+ dbSchemaName ,
123125 ConnectionConfig .JDBC_PLUGIN_TYPE ,
124126 dbSinkConfig .getJdbcPluginName (),
125127 connectionString );
@@ -132,8 +134,8 @@ public void prepareRun(BatchSinkContext context) {
132134 try {
133135 if (Objects .nonNull (outputSchema )) {
134136 FailureCollector collector = context .getFailureCollector ();
135- validateSchema (collector , driverClass , dbSinkConfig . getTableName () ,
136- outputSchema , dbSinkConfig . getDBSchemaName () );
137+ validateSchema (collector , driverClass , tableName ,
138+ outputSchema , dbSchemaName );
137139 collector .getOrThrowException ();
138140 } else {
139141 outputSchema = inferSchema (driverClass );
@@ -151,8 +153,8 @@ public void prepareRun(BatchSinkContext context) {
151153 configAccessor .setInitQueries (dbSinkConfig .getInitQueries ());
152154 configAccessor .getConfiguration ().set (DBConfiguration .DRIVER_CLASS_PROPERTY , driverClass .getName ());
153155 configAccessor .getConfiguration ().set (DBConfiguration .URL_PROPERTY , connectionString );
154- String fullyQualifiedTableName = dbSinkConfig . getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
155- : dbSinkConfig . getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
156+ String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig .getEscapedTableName ()
157+ : dbSchemaName + "." + dbSinkConfig .getEscapedTableName ();
156158 configAccessor .getConfiguration ().set (DBConfiguration .OUTPUT_TABLE_NAME_PROPERTY , fullyQualifiedTableName );
157159 configAccessor .getConfiguration ().set (DBConfiguration .OUTPUT_FIELD_NAMES_PROPERTY , dbColumns );
158160 if (dbSinkConfig .getUser () != null ) {
@@ -205,8 +207,9 @@ public void initialize(BatchRuntimeContext context) throws Exception {
205207
206208 private Schema inferSchema (Class <? extends Driver > driverClass ) {
207209 List <Schema .Field > inferredFields = new ArrayList <>();
208- String fullyQualifiedTableName = dbSinkConfig .getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
209- : dbSinkConfig .getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
210+ String dbSchemaName = dbSinkConfig .getDBSchemaName ();
211+ String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig .getEscapedTableName ()
212+ : dbSchemaName + "." + dbSinkConfig .getEscapedTableName ();
210213 try {
211214 DBUtils .ensureJDBCDriverIsAvailable (driverClass , dbSinkConfig .getConnectionString (),
212215 dbSinkConfig .getJdbcPluginName ());
@@ -255,8 +258,9 @@ public void destroy() {
255258 private void setResultSetMetadata () throws Exception {
256259 List <ColumnType > columnTypes = new ArrayList <>(columns .size ());
257260 String connectionString = dbSinkConfig .getConnectionString ();
258- String fullyQualifiedTableName = dbSinkConfig .getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
259- : dbSinkConfig .getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
261+ String dbSchemaName = dbSinkConfig .getDBSchemaName ();
262+ String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig .getEscapedTableName ()
263+ : dbSchemaName + "." + dbSinkConfig .getEscapedTableName ();
260264
261265 driverCleanup = DBUtils
262266 .ensureJDBCDriverIsAvailable (driverClass , connectionString , dbSinkConfig .getJdbcPluginName ());
@@ -305,8 +309,8 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
305309 private void validateSchema (FailureCollector collector , Class <? extends Driver > jdbcDriverClass , String tableName ,
306310 Schema inputSchema , String dbSchemaName ) {
307311 String connectionString = dbSinkConfig .getConnectionString ();
308- String fullyQualifiedTableName = dbSinkConfig . getDBSchemaName () == null ? dbSinkConfig .getEscapedTableName ()
309- : dbSinkConfig . getDBSchemaName () + "." + dbSinkConfig .getEscapedTableName ();
312+ String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig .getEscapedTableName ()
313+ : dbSchemaName + "." + dbSinkConfig .getEscapedTableName ();
310314 try {
311315 DBUtils .ensureJDBCDriverIsAvailable (jdbcDriverClass , connectionString , dbSinkConfig .getJdbcPluginName ());
312316 } catch (IllegalAccessException | InstantiationException | SQLException e ) {
@@ -386,7 +390,7 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi
386390 @ Description ("Name of the database schema of table." )
387391 @ Macro
388392 @ Nullable
389- public String dbSchemaName ;
393+ private String dbSchemaName ;
390394
391395 public String getTableName () {
392396 return tableName ;
0 commit comments