Skip to content

Commit 70143b5

Browse files
added fetch size to connector spec and default fetch size for postgres source
1 parent a88dfc3 commit 70143b5

10 files changed

Lines changed: 28 additions & 2 deletions

File tree

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
9999
path.getSchema(),
100100
path.getTable()));
101101
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.NUM_SPLITS, "1");
102+
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.FETCH_SIZE,
103+
CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.DEFAULT_FETCH_SIZE);
102104
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
103105
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
104106
properties.put(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.TABLE_NAME, table);

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.cdap.plugin.common.ReferenceNames;
3232
import io.cdap.plugin.common.db.DBConnectorPath;
3333
import io.cdap.plugin.common.db.DBPath;
34-
import io.cdap.plugin.db.ConnectionConfig;
3534
import io.cdap.plugin.db.SchemaReader;
3635
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
3736
import io.cdap.plugin.postgres.PostgresDBRecord;
@@ -112,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
112111
sinkProperties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.DB_SCHEMA_NAME, schema);
113112
}
114113
sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1");
114+
sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.FETCH_SIZE,
115+
CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.DEFAULT_FETCH_SIZE);
115116
String table = path.getTable();
116117
if (table == null) {
117118
return;

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ protected Map<String, String> getDBSpecificArguments() {
112112
return Collections.emptyMap();
113113
}
114114

115+
@Override
116+
public Integer getFetchSize() {
117+
Integer fetchSize = super.getFetchSize();
118+
return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize;
119+
}
120+
115121
@Override
116122
protected CloudSQLPostgreSQLConnectorConfig getConnection() {
117123
return connection;

database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSourceConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
4848
public static final String SCHEMA = "schema";
4949
public static final String DATABASE = "database";
5050
public static final String FETCH_SIZE = "fetchSize";
51+
public static final String DEFAULT_FETCH_SIZE = "1000";
5152

5253
@Name(Constants.Reference.REFERENCE_NAME)
5354
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
8787
sinkProperties.put(SqlServerSink.SqlServerSinkConfig.DB_SCHEMA_NAME, schema);
8888
}
8989
sourceProperties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1");
90+
sourceProperties.put(SqlServerSource.SqlServerSourceConfig.FETCH_SIZE,
91+
SqlServerSource.SqlServerSourceConfig.DEFAULT_FETCH_SIZE);
9092
String table = path.getTable();
9193
if (table == null) {
9294
return;

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
8080
properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
8181
path.getTable()));
8282
properties.put(MysqlSource.MysqlSourceConfig.NUM_SPLITS, "1");
83+
properties.put(MysqlSource.MysqlSourceConfig.FETCH_SIZE, MysqlSource.MysqlSourceConfig.DEFAULT_FETCH_SIZE);
8384
properties.put(MysqlSource.MysqlSourceConfig.DATABASE, path.getDatabase());
8485
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
8586
properties.put(MysqlSink.MysqlSinkConfig.TABLE_NAME, table);

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
8585
sinkProperties.put(OracleSink.OracleSinkConfig.DB_SCHEMA_NAME, schema);
8686
}
8787
sourceProperties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1");
88+
sourceProperties.put(OracleSource.OracleSourceConfig.FETCH_SIZE,
89+
OracleSource.OracleSourceConfig.DEFAULT_FETCH_SIZE);
8890
String table = path.getTable();
8991
if (table == null) {
9092
return;

postgresql-plugin/docs/Postgres-batchsource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ back from the query. However, it must match the schema that comes back from the
6060
except it can mark fields as nullable and can contain a subset of the fields.
6161

6262
**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
63-
with the tradeoff of higher memory usage.
63+
with the tradeoff of higher memory usage. If not specified, the default value is 1000.
6464

6565
Example
6666
------

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
8686
sinkProperties.put(PostgresSink.PostgresSinkConfig.DB_SCHEMA_NAME, schema);
8787
}
8888
sourceProperties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1");
89+
sourceProperties.put(PostgresSource.PostgresSourceConfig.FETCH_SIZE,
90+
PostgresSource.PostgresSourceConfig.DEFAULT_FETCH_SIZE);
91+
sourceProperties.put(PostgresConstants.CONNECTION_TIMEOUT,
92+
PostgresSource.PostgresSourceConfig.DEFAULT_CONNECTION_TIMEOUT_SECONDS);
8993
String table = path.getTable();
9094
if (table == null) {
9195
return;

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public static class PostgresSourceConfig extends AbstractDBSpecificSourceConfig
7575

7676
public static final String NAME_USE_CONNECTION = "useConnection";
7777
public static final String NAME_CONNECTION = "connection";
78+
public static final String DEFAULT_CONNECTION_TIMEOUT_SECONDS = "100";
7879

7980
@Name(NAME_USE_CONNECTION)
8081
@Nullable
@@ -106,6 +107,12 @@ public Map<String, String> getDBSpecificArguments() {
106107
return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));
107108
}
108109

110+
@Override
111+
public Integer getFetchSize() {
112+
Integer fetchSize = super.getFetchSize();
113+
return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize;
114+
}
115+
109116
@Override
110117
protected AbstractDBSpecificConnectorConfig getConnection() {
111118
return connection;

0 commit comments

Comments
 (0)