Skip to content

Commit a93db57

Browse files
Merge pull request #234 from jster1357/jtaras
add schema support for oracle: PLUGIN-1146
2 parents d4ba3cd + dec285f commit a93db57

14 files changed

Lines changed: 109 additions & 17 deletions

File tree

aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
"widget-type": "textbox",
4646
"label": "Table Name",
4747
"name": "tableName"
48+
},
49+
{
50+
"widget-type": "hidden",
51+
"label": "Schema Name",
52+
"name": "dbSchemaName"
4853
}
4954
]
5055
},

aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
"widget-type": "textbox",
4646
"label": "Table Name",
4747
"name": "tableName"
48+
},
49+
{
50+
"widget-type": "hidden",
51+
"label": "Schema Name",
52+
"name": "dbSchemaName"
4853
}
4954
]
5055
},

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@
121121
"widget-attributes": {
122122
"placeholder": "The table to write to"
123123
}
124+
},
125+
{
126+
"widget-type": "hidden",
127+
"label": "Schema Name",
128+
"name": "dbSchemaName"
124129
}
125130
]
126131
},

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@
121121
"widget-attributes": {
122122
"placeholder": "The table to write to"
123123
}
124+
},
125+
{
126+
"widget-type": "hidden",
127+
"label": "Schema Name",
128+
"name": "dbSchemaName"
124129
}
125130
]
126131
},

database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSinkConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*/
3636
public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implements DatabaseSinkConfig {
3737
public static final String TABLE_NAME = "tableName";
38+
public static final String DB_SCHEMA_NAME = "dbSchemaName";
3839
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
3940

4041
@Name(Constants.Reference.REFERENCE_NAME)
@@ -46,11 +47,22 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen
4647
@Macro
4748
private String tableName;
4849

50+
@Name(DB_SCHEMA_NAME)
51+
@Description("Name of the database schema of table.")
52+
@Macro
53+
@Nullable
54+
public String dbSchemaName;
55+
4956
@Override
5057
public String getTableName() {
5158
return tableName;
5259
}
5360

61+
@Override
62+
public String getDBSchemaName() {
63+
return dbSchemaName;
64+
}
65+
5466
/**
5567
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
5668
* databases with case-sensitive identifiers.

database-commons/src/main/java/io/cdap/plugin/db/batch/config/DatabaseSinkConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig {
5050
*/
5151
String getTableName();
5252

53+
/**
54+
* @return the schema name
55+
*/
56+
String getDBSchemaName();
57+
5358
/**
5459
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
5560
* databases with case-sensitive identifiers.

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.Optional;
6969
import java.util.Properties;
7070
import java.util.stream.Collectors;
71+
import javax.annotation.Nullable;
7172

7273
/**
7374
* Sink that can be configured to export data to a database table.
@@ -108,16 +109,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
108109
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
109110
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
110111
if (driverClass != null && dbSinkConfig.canConnect()) {
111-
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema);
112+
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema, dbSinkConfig.getDBSchemaName());
112113
}
113114
}
114115

115116
@Override
116117
public void prepareRun(BatchSinkContext context) {
117118
String connectionString = dbSinkConfig.getConnectionString();
118119

119-
LOG.debug("tableName = {}; pluginType = {}; pluginName = {}; connectionString = {};",
120+
LOG.debug("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};",
120121
dbSinkConfig.getTableName(),
122+
dbSinkConfig.getDBSchemaName(),
121123
ConnectionConfig.JDBC_PLUGIN_TYPE,
122124
dbSinkConfig.getJdbcPluginName(),
123125
connectionString);
@@ -130,7 +132,8 @@ public void prepareRun(BatchSinkContext context) {
130132
try {
131133
if (Objects.nonNull(outputSchema)) {
132134
FailureCollector collector = context.getFailureCollector();
133-
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), outputSchema);
135+
validateSchema(collector, driverClass, dbSinkConfig.getTableName(),
136+
outputSchema, dbSinkConfig.getDBSchemaName());
134137
collector.getOrThrowException();
135138
} else {
136139
outputSchema = inferSchema(driverClass);
@@ -148,8 +151,9 @@ public void prepareRun(BatchSinkContext context) {
148151
configAccessor.setInitQueries(dbSinkConfig.getInitQueries());
149152
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
150153
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
151-
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY,
152-
dbSinkConfig.getEscapedTableName());
154+
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
155+
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
156+
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
153157
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns);
154158
if (dbSinkConfig.getUser() != null) {
155159
configAccessor.getConfiguration().set(DBConfiguration.USERNAME_PROPERTY, dbSinkConfig.getUser());
@@ -201,6 +205,8 @@ public void initialize(BatchRuntimeContext context) throws Exception {
201205

202206
private Schema inferSchema(Class<? extends Driver> driverClass) {
203207
List<Schema.Field> inferredFields = new ArrayList<>();
208+
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
209+
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
204210
try {
205211
DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(),
206212
dbSinkConfig.getJdbcPluginName());
@@ -211,7 +217,7 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
211217
executeInitQueries(connection, dbSinkConfig.getInitQueries());
212218

213219
try (Statement statement = connection.createStatement();
214-
ResultSet rs = statement.executeQuery("SELECT * FROM " + dbSinkConfig.getEscapedTableName()
220+
ResultSet rs = statement.executeQuery("SELECT * FROM " + fullyQualifiedTableName
215221
+ " WHERE 1 = 0")) {
216222
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
217223
}
@@ -249,6 +255,8 @@ public void destroy() {
249255
private void setResultSetMetadata() throws Exception {
250256
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
251257
String connectionString = dbSinkConfig.getConnectionString();
258+
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
259+
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
252260

253261
driverCleanup = DBUtils
254262
.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName());
@@ -261,8 +269,7 @@ private void setResultSetMetadata() throws Exception {
261269
// Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata
262270
// that can be used to construct DBRecord objects to sink to the database table.
263271
ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0",
264-
dbColumns,
265-
dbSinkConfig.getEscapedTableName()))
272+
dbColumns, fullyQualifiedTableName))
266273
) {
267274
ResultSetMetaData resultSetMetadata = rs.getMetaData();
268275
columnTypes.addAll(getMatchedColumnTypeList(resultSetMetadata, columns));
@@ -296,45 +303,47 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
296303
}
297304

298305
private void validateSchema(FailureCollector collector, Class<? extends Driver> jdbcDriverClass, String tableName,
299-
Schema inputSchema) {
306+
Schema inputSchema, String dbSchemaName) {
300307
String connectionString = dbSinkConfig.getConnectionString();
301-
308+
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
309+
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
302310
try {
303311
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName());
304312
} catch (IllegalAccessException | InstantiationException | SQLException e) {
305313
collector.addFailure(String.format("Unable to load or register JDBC driver '%s' while checking for " +
306314
"the existence of the database table '%s'.",
307-
jdbcDriverClass, tableName), null).withStacktrace(e.getStackTrace());
315+
jdbcDriverClass, fullyQualifiedTableName),
316+
null).withStacktrace(e.getStackTrace());
308317
throw collector.getOrThrowException();
309318
}
310319

311320
Properties connectionProperties = new Properties();
312321
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
313322
try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
314323
executeInitQueries(connection, dbSinkConfig.getInitQueries());
315-
try (ResultSet tables = connection.getMetaData().getTables(null, null, tableName, null)) {
324+
try (ResultSet tables = connection.getMetaData().getTables(null, dbSchemaName, tableName, null)) {
316325
if (!tables.next()) {
317326
collector.addFailure(
318327
String.format("Table '%s' does not exist.", tableName),
319-
String.format("Ensure table '%s' is set correctly and that the connection string '%s' points " +
320-
"to a valid database.", tableName, connectionString))
328+
String.format("Ensure table '%s' is set correctly and that the connection string '%s' " +
329+
"points to a valid database.", fullyQualifiedTableName, connectionString))
321330
.withConfigProperty(DBSinkConfig.TABLE_NAME);
322331
return;
323332
}
324333
}
325334
setColumnsInfo(inputSchema.getFields());
326335
try (PreparedStatement pStmt = connection.prepareStatement(String.format("SELECT %s FROM %s WHERE 1 = 0",
327336
dbColumns,
328-
dbSinkConfig.getEscapedTableName()));
337+
fullyQualifiedTableName));
329338
ResultSet rs = pStmt.executeQuery()) {
330339
getFieldsValidator().validateFields(inputSchema, rs, collector);
331340
}
332341
} catch (SQLException e) {
333342
LOG.error("Exception while trying to validate schema of database table {} for connection {}.",
334-
tableName, connectionString, e);
343+
fullyQualifiedTableName, connectionString, e);
335344
collector.addFailure(
336345
String.format("Exception while trying to validate schema of database table '%s' for connection '%s' with %s",
337-
tableName, connectionString, e.getMessage()),
346+
fullyQualifiedTableName, connectionString, e.getMessage()),
338347
null).withStacktrace(e.getStackTrace());
339348
}
340349
}
@@ -365,17 +374,28 @@ private void executeInitQueries(Connection connection, List<String> initQueries)
365374
*/
366375
public abstract static class DBSinkConfig extends DBConfig implements DatabaseSinkConfig {
367376
public static final String TABLE_NAME = "tableName";
377+
public static final String DB_SCHEMA_NAME = "dbSchemaName";
368378
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
369379

370380
@Name(TABLE_NAME)
371381
@Description("Name of the database table to write to.")
372382
@Macro
373383
public String tableName;
374384

385+
@Name(DB_SCHEMA_NAME)
386+
@Description("Name of the database schema of table.")
387+
@Macro
388+
@Nullable
389+
public String dbSchemaName;
390+
375391
public String getTableName() {
376392
return tableName;
377393
}
378394

395+
public String getDBSchemaName() {
396+
return dbSchemaName;
397+
}
398+
379399
/**
380400
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
381401
* databases with case-sensitive identifiers.

db2-plugin/widgets/Db2-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
"widget-type": "textbox",
4949
"label": "Table Name",
5050
"name": "tableName"
51+
},
52+
{
53+
"widget-type": "hidden",
54+
"label": "Schema Name",
55+
"name": "dbSchemaName"
5156
}
5257
]
5358
},

generic-database-plugin/widgets/Database-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
"widget-type": "textbox",
3030
"label": "Table Name",
3131
"name": "tableName"
32+
},
33+
{
34+
"widget-type": "hidden",
35+
"label": "Schema Name",
36+
"name": "dbSchemaName"
3237
}
3338
]
3439
},

mariadb-plugin/widgets/Mariadb-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040
"widget-type": "textbox",
4141
"label": "Table Name",
4242
"name": "tableName"
43+
},
44+
{
45+
"widget-type": "hidden",
46+
"label": "Schema Name",
47+
"name": "dbSchemaName"
4348
}
4449
]
4550
},

0 commit comments

Comments
 (0)