Skip to content

Commit 992c68e

Browse files
dbschema support and connection fixes
1 parent 19cc862 commit 992c68e

10 files changed

Lines changed: 112 additions & 60 deletions

File tree

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.cdap.plugin.common.Constants;
3131
import io.cdap.plugin.common.ReferenceNames;
3232
import io.cdap.plugin.common.db.DBConnectorPath;
33+
import io.cdap.plugin.common.db.DBPath;
3334
import io.cdap.plugin.db.ConnectionConfig;
3435
import io.cdap.plugin.db.SchemaReader;
3536
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
@@ -38,6 +39,7 @@
3839
import org.apache.hadoop.io.LongWritable;
3940
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
4041

42+
import java.io.IOException;
4143
import java.util.HashMap;
4244
import java.util.Map;
4345

@@ -57,6 +59,11 @@ public CloudSQLPostgreSQLConnector(CloudSQLPostgreSQLConnectorConfig config) {
5759
this.config = config;
5860
}
5961

62+
@Override
63+
protected DBConnectorPath getDBConnectorPath(String path) throws IOException {
64+
return new DBPath(path, true);
65+
}
66+
6067
@Override
6168
public boolean supportSchema() {
6269
return true;
@@ -90,22 +97,30 @@ protected String getTableQuery(String database, String schema, String table, int
9097
@Override
9198
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
9299
ConnectorSpec.Builder builder) {
93-
Map<String, String> properties = new HashMap<>();
94-
setConnectionProperties(properties, request);
100+
Map<String, String> sourceProperties = new HashMap<>();
101+
Map<String, String> sinkProperties = new HashMap<>();
102+
setConnectionProperties(sourceProperties, request);
103+
setConnectionProperties(sinkProperties, request);
95104
builder
96-
.addRelatedPlugin(new PluginSpec(CloudSQLPostgreSQLConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties))
97-
.addRelatedPlugin(new PluginSpec(CloudSQLPostgreSQLConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties));
105+
.addRelatedPlugin(new PluginSpec(CloudSQLPostgreSQLConstants.PLUGIN_NAME,
106+
BatchSource.PLUGIN_TYPE, sourceProperties))
107+
.addRelatedPlugin(new PluginSpec(CloudSQLPostgreSQLConstants.PLUGIN_NAME,
108+
BatchSink.PLUGIN_TYPE, sinkProperties));
98109

110+
sinkProperties.put(ConnectionConfig.DATABASE, config.getDatabase());
111+
String schema = path.getSchema();
112+
if (schema != null) {
113+
sinkProperties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.DB_SCHEMA_NAME, schema);
114+
}
115+
sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1");
99116
String table = path.getTable();
100117
if (table == null) {
101118
return;
102119
}
103-
104-
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.IMPORT_QUERY,
105-
getTableQuery(path.getDatabase(), path.getSchema(), path.getTable()));
106-
properties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1");
107-
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
108-
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
109-
properties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.TABLE_NAME, table);
120+
sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.IMPORT_QUERY,
121+
getTableQuery(path.getDatabase(), schema, table));
122+
sinkProperties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.TABLE_NAME, table);
123+
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
124+
sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
110125
}
111126
}

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@
123123
}
124124
},
125125
{
126-
"widget-type": "hidden",
126+
"widget-type": "textbox",
127127
"label": "Schema Name",
128128
"name": "dbSchemaName"
129129
}

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@
214214
"type": "property",
215215
"name": "password"
216216
},
217+
{
218+
"type": "property",
219+
"name": "database"
220+
},
217221
{
218222
"type": "property",
219223
"name": "connectionArguments"

database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSinkConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen
5151
@Description("Name of the database schema of table.")
5252
@Macro
5353
@Nullable
54-
public String dbSchemaName;
54+
private String dbSchemaName;
5555

5656
@Override
5757
public String getTableName() {

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,12 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
116116
@Override
117117
public void prepareRun(BatchSinkContext context) {
118118
String connectionString = dbSinkConfig.getConnectionString();
119+
String dbSchemaName = dbSinkConfig.getDBSchemaName();
120+
String tableName = dbSinkConfig.getTableName();
119121

120122
LOG.debug("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};",
121-
dbSinkConfig.getTableName(),
122-
dbSinkConfig.getDBSchemaName(),
123+
tableName,
124+
dbSchemaName,
123125
ConnectionConfig.JDBC_PLUGIN_TYPE,
124126
dbSinkConfig.getJdbcPluginName(),
125127
connectionString);
@@ -132,8 +134,8 @@ public void prepareRun(BatchSinkContext context) {
132134
try {
133135
if (Objects.nonNull(outputSchema)) {
134136
FailureCollector collector = context.getFailureCollector();
135-
validateSchema(collector, driverClass, dbSinkConfig.getTableName(),
136-
outputSchema, dbSinkConfig.getDBSchemaName());
137+
validateSchema(collector, driverClass, tableName,
138+
outputSchema, dbSchemaName);
137139
collector.getOrThrowException();
138140
} else {
139141
outputSchema = inferSchema(driverClass);
@@ -151,8 +153,8 @@ public void prepareRun(BatchSinkContext context) {
151153
configAccessor.setInitQueries(dbSinkConfig.getInitQueries());
152154
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
153155
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
154-
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
155-
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
156+
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
157+
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
156158
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
157159
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns);
158160
if (dbSinkConfig.getUser() != null) {
@@ -205,8 +207,9 @@ public void initialize(BatchRuntimeContext context) throws Exception {
205207

206208
private Schema inferSchema(Class<? extends Driver> driverClass) {
207209
List<Schema.Field> inferredFields = new ArrayList<>();
208-
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
209-
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
210+
String dbSchemaName = dbSinkConfig.getDBSchemaName();
211+
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
212+
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
210213
try {
211214
DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(),
212215
dbSinkConfig.getJdbcPluginName());
@@ -255,8 +258,9 @@ public void destroy() {
255258
private void setResultSetMetadata() throws Exception {
256259
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
257260
String connectionString = dbSinkConfig.getConnectionString();
258-
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
259-
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
261+
String dbSchemaName = dbSinkConfig.getDBSchemaName();
262+
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
263+
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
260264

261265
driverCleanup = DBUtils
262266
.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName());
@@ -305,8 +309,8 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
305309
private void validateSchema(FailureCollector collector, Class<? extends Driver> jdbcDriverClass, String tableName,
306310
Schema inputSchema, String dbSchemaName) {
307311
String connectionString = dbSinkConfig.getConnectionString();
308-
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
309-
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
312+
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
313+
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
310314
try {
311315
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName());
312316
} catch (IllegalAccessException | InstantiationException | SQLException e) {
@@ -386,7 +390,7 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi
386390
@Description("Name of the database schema of table.")
387391
@Macro
388392
@Nullable
389-
public String dbSchemaName;
393+
private String dbSchemaName;
390394

391395
public String getTableName() {
392396
return tableName;

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.cdap.plugin.common.Constants;
3232
import io.cdap.plugin.common.ReferenceNames;
3333
import io.cdap.plugin.common.db.DBConnectorPath;
34+
import io.cdap.plugin.db.ConnectionConfig;
3435
import io.cdap.plugin.db.SchemaReader;
3536
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
3637
import org.apache.hadoop.io.LongWritable;
@@ -68,23 +69,33 @@ protected Class<? extends DBWritable> getDBRecordType() {
6869

6970
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
7071
ConnectorSpec.Builder builder) {
71-
Map<String, String> properties = new HashMap<>();
72-
setConnectionProperties(properties, request);
72+
Map<String, String> sourceProperties = new HashMap<>();
73+
Map<String, String> sinkProperties = new HashMap<>();
74+
setConnectionProperties(sourceProperties, request);
75+
setConnectionProperties(sinkProperties, request);
7376
builder
74-
.addRelatedPlugin(new PluginSpec(SqlServerConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties))
75-
.addRelatedPlugin(new PluginSpec(SqlServerConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties));
77+
.addRelatedPlugin(new PluginSpec(SqlServerConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, sourceProperties))
78+
.addRelatedPlugin(new PluginSpec(SqlServerConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, sinkProperties));
79+
80+
String database = path.getDatabase();
81+
if (database != null) {
82+
sinkProperties.put(ConnectionConfig.DATABASE, database);
83+
sourceProperties.put(ConnectionConfig.DATABASE, database);
84+
}
85+
String schema = path.getSchema();
86+
if (schema != null) {
87+
sinkProperties.put(SqlServerSink.SqlServerSinkConfig.DB_SCHEMA_NAME, schema);
88+
}
89+
sourceProperties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1");
7690
String table = path.getTable();
7791
if (table == null) {
7892
return;
7993
}
80-
81-
properties.put(SqlServerSource.SqlServerSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
82-
path.getSchema(),
83-
path.getTable()));
84-
properties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1");
85-
properties.put(SqlServerSource.SqlServerSourceConfig.DATABASE, path.getDatabase());
86-
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
87-
properties.put(SqlServerSink.SqlServerSinkConfig.TABLE_NAME, table);
94+
sourceProperties.put(SqlServerSource.SqlServerSourceConfig.IMPORT_QUERY,
95+
getTableQuery(database, schema, table));
96+
sinkProperties.put(SqlServerSink.SqlServerSinkConfig.TABLE_NAME, table);
97+
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
98+
sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
8899
}
89100

90101
@Override

mssql-plugin/widgets/SqlServer-batchsink.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@
128128
"name": "tableName"
129129
},
130130
{
131-
"widget-type": "hidden",
131+
"widget-type": "textbox",
132132
"label": "Schema Name",
133133
"name": "dbSchemaName"
134134
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,22 +72,29 @@ protected Class<? extends DBWritable> getDBRecordType() {
7272

7373
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
7474
ConnectorSpec.Builder builder) {
75-
Map<String, String> properties = new HashMap<>();
76-
setConnectionProperties(properties, request);
75+
Map<String, String> sourceProperties = new HashMap<>();
76+
Map<String, String> sinkProperties = new HashMap<>();
77+
setConnectionProperties(sourceProperties, request);
78+
setConnectionProperties(sinkProperties, request);
7779
builder
78-
.addRelatedPlugin(new PluginSpec(OracleConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties))
79-
.addRelatedPlugin(new PluginSpec(OracleConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties));
80+
.addRelatedPlugin(new PluginSpec(OracleConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, sourceProperties))
81+
.addRelatedPlugin(new PluginSpec(OracleConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, sinkProperties));
8082

83+
sinkProperties.put(OracleConstants.NAME_DATABASE, config.getDatabase());
84+
String schema = path.getSchema();
85+
if (schema != null) {
86+
sinkProperties.put(OracleSink.OracleSinkConfig.DB_SCHEMA_NAME, schema);
87+
}
88+
sourceProperties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1");
8189
String table = path.getTable();
8290
if (table == null) {
8391
return;
8492
}
85-
86-
properties.put(OracleSource.OracleSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
87-
path.getTable()));
88-
properties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1");
89-
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
90-
properties.put(OracleSink.OracleSinkConfig.TABLE_NAME, table);
93+
sourceProperties.put(OracleSource.OracleSourceConfig.IMPORT_QUERY,
94+
getTableQuery(path.getDatabase(), schema, table));
95+
sinkProperties.put(OracleSink.OracleSinkConfig.TABLE_NAME, table);
96+
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
97+
sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
9198
}
9299

93100
@Override

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,23 +73,29 @@ protected DBConnectorPath getDBConnectorPath(String path) throws IOException {
7373

7474
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
7575
ConnectorSpec.Builder builder) {
76-
Map<String, String> properties = new HashMap<>();
77-
setConnectionProperties(properties, request);
76+
Map<String, String> sourceProperties = new HashMap<>();
77+
Map<String, String> sinkProperties = new HashMap<>();
78+
setConnectionProperties(sourceProperties, request);
79+
setConnectionProperties(sinkProperties, request);
7880
builder
79-
.addRelatedPlugin(new PluginSpec(PostgresConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, properties))
80-
.addRelatedPlugin(new PluginSpec(PostgresConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, properties));
81+
.addRelatedPlugin(new PluginSpec(PostgresConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, sourceProperties))
82+
.addRelatedPlugin(new PluginSpec(PostgresConstants.PLUGIN_NAME, BatchSink.PLUGIN_TYPE, sinkProperties));
8183

84+
sinkProperties.put(PostgresConnectorConfig.NAME_DATABASE, config.getDatabase());
85+
String schema = path.getSchema();
86+
if (schema != null) {
87+
sinkProperties.put(PostgresSink.PostgresSinkConfig.DB_SCHEMA_NAME, schema);
88+
}
89+
sourceProperties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1");
8290
String table = path.getTable();
8391
if (table == null) {
8492
return;
8593
}
86-
87-
properties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
88-
path.getSchema(), path.getTable()));
89-
properties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1");
90-
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
91-
properties.put(PostgresSink.PostgresSinkConfig.TABLE_NAME, table);
92-
94+
sourceProperties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY,
95+
getTableQuery(path.getDatabase(), schema, table));
96+
sinkProperties.put(PostgresSink.PostgresSinkConfig.TABLE_NAME, table);
97+
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
98+
sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
9399
}
94100

95101
@Override

postgresql-plugin/widgets/Postgres-batchsink.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@
107107
"widget-type": "textbox",
108108
"label": "Table Name",
109109
"name": "tableName"
110+
},
111+
{
112+
"widget-type": "textbox",
113+
"label": "Schema Name",
114+
"name": "dbSchemaName"
110115
}
111116
]
112117
},

0 commit comments

Comments
 (0)