Skip to content

Commit c7fff76

Browse files
Merge pull request #59 from data-integrations/feature/fqn-multi-table-source
Emit lineage for the plugin
2 parents 51ad81d + 1529284 commit c7fff76

4 files changed

Lines changed: 181 additions & 2 deletions

File tree

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<cdap.version>6.8.0-SNAPSHOT</cdap.version>
3131
<hadoop.version>2.10.2</hadoop.version>
3232
<hsql.version>2.2.4</hsql.version>
33-
<hydrator.version>2.9.0-SNAPSHOT</hydrator.version>
33+
<hydrator.version>2.11.0-SNAPSHOT</hydrator.version>
3434
<!-- properties for script build step that creates the config files for the artifacts -->
3535
<widgets.dir>widgets</widgets.dir>
3636
<docs.dir>docs</docs.dir>
@@ -39,6 +39,7 @@
3939
<main.basedir>${project.basedir}</main.basedir>
4040
<commons.codec.version>1.6</commons.codec.version>
4141
<avro.version>1.7.7</avro.version>
42+
<opentracingjdbc.version>0.2.15</opentracingjdbc.version>
4243
</properties>
4344

4445
<repositories>
@@ -249,6 +250,11 @@
249250
</exclusion>
250251
</exclusions>
251252
</dependency>
253+
<dependency>
254+
<groupId>io.opentracing.contrib</groupId>
255+
<artifactId>opentracing-jdbc</artifactId>
256+
<version>${opentracingjdbc.version}</version>
257+
</dependency>
252258
</dependencies>
253259

254260
<build>

src/main/java/io/cdap/plugin/MultiTableDBSource.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import io.cdap.cdap.api.data.batch.Input;
2323
import io.cdap.cdap.api.data.batch.InputFormatProvider;
2424
import io.cdap.cdap.api.data.format.StructuredRecord;
25+
import io.cdap.cdap.api.data.schema.Schema;
2526
import io.cdap.cdap.api.dataset.lib.KeyValue;
2627
import io.cdap.cdap.api.plugin.PluginProperties;
2728
import io.cdap.cdap.etl.api.Emitter;
2829
import io.cdap.cdap.etl.api.PipelineConfigurer;
2930
import io.cdap.cdap.etl.api.action.SettableArguments;
3031
import io.cdap.cdap.etl.api.batch.BatchSource;
3132
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
33+
import io.cdap.plugin.common.Asset;
34+
import io.cdap.plugin.common.LineageRecorder;
3235
import io.cdap.plugin.common.SourceInputFormatProvider;
3336
import io.cdap.plugin.format.DBTableInfo;
3437
import io.cdap.plugin.format.MultiSQLStatementInputFormat;
@@ -39,6 +42,7 @@
3942
import io.cdap.plugin.format.error.collector.ErrorCollectingMultiSQLStatementInputFormat;
4043
import io.cdap.plugin.format.error.collector.ErrorCollectingMultiTableDBInputFormat;
4144
import io.cdap.plugin.format.error.emitter.ErrorEmittingInputFormat;
45+
import io.cdap.plugin.util.FQNGenerator;
4246
import org.apache.hadoop.conf.Configuration;
4347
import org.apache.hadoop.io.NullWritable;
4448
import org.slf4j.Logger;
@@ -49,6 +53,7 @@
4953
import java.util.Collection;
5054
import java.util.HashMap;
5155
import java.util.Map;
56+
import java.util.stream.Collectors;
5257

5358
/**
5459
* Batch source to read from multiple tables in a database using JDBC.
@@ -158,14 +163,29 @@ public void setContextForMultiTableDBInput(BatchSourceContext context,
158163

159164
SettableArguments arguments = context.getArguments();
160165
for (DBTableInfo tableInfo : tables) {
166+
Schema schema = tableInfo.getSchema();
161167
arguments.set(DynamicMultiFilesetSink.TABLE_PREFIX + tableInfo.getDbTableName().getTable(),
162-
tableInfo.getSchema().toString());
168+
schema.toString());
169+
emitLineage(context, tableInfo, schema);
163170
}
164171

165172
context.setInput(Input.of(conf.getReferenceName(),
166173
new SourceInputFormatProvider(ErrorCollectingMultiTableDBInputFormat.class, hConf)));
167174
}
168175

176+
private void emitLineage(BatchSourceContext context, DBTableInfo tableInfo, Schema schema) {
177+
Asset asset = Asset.builder(conf.getReferenceName())
178+
.setFqn(FQNGenerator.constructFQN(conf.getConnectionString(), tableInfo.getDbTableName().getTable()))
179+
.setMarker(tableInfo.getDbTableName().getTable()).build();
180+
LineageRecorder lineageRecorder = new LineageRecorder(context, asset);
181+
lineageRecorder.createExternalDataset(schema);
182+
if (schema != null && schema.getFields() != null) {
183+
String operationName = "Read_from_" + tableInfo.getDbTableName().getTable();
184+
lineageRecorder.recordRead(operationName, "Read from database plugin",
185+
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
186+
}
187+
}
188+
169189
public void setContextForMultiSQLStatementInput(BatchSourceContext context,
170190
Configuration hConf, Class<? extends Driver> driverClass) {
171191
MultiSQLStatementInputFormat.setInput(hConf, conf, driverClass);
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.util;
18+
19+
import io.opentracing.contrib.jdbc.ConnectionInfo;
20+
import io.opentracing.contrib.jdbc.parser.URLParser;
21+
22+
/**
23+
* Generate FQN from DB URL Connection.
24+
* TODO: CDAP-20456 Reuse the class from hydrator/database-plugins module
25+
*/
26+
public final class FQNGenerator {
27+
28+
private static final String POSTGRESQL_TAG = "postgresql";
29+
private static final String POSTGRESQL_DEFAULT_SCHEMA = "public";
30+
31+
private FQNGenerator() { }
32+
33+
public static String constructFQN(String jdbcUrl, String tableName) {
34+
// dbtype, host, port, db from the connection string
35+
// table is the reference name
36+
ConnectionInfo connectionInfo = URLParser.parse(jdbcUrl);
37+
// DB type as set by library after extraction
38+
if (POSTGRESQL_TAG.equals(connectionInfo.getDbType())) {
39+
// FQN for Postgresql
40+
return String.format("%s://%s/%s.%s.%s", connectionInfo.getDbType(), connectionInfo.getDbPeer(),
41+
connectionInfo.getDbInstance(), getPostgresqlSchema(jdbcUrl), tableName);
42+
} else {
43+
// FQN for MySQL, Oracle, SQLServer
44+
return String.format("%s://%s/%s.%s", connectionInfo.getDbType(), connectionInfo.getDbPeer(),
45+
connectionInfo.getDbInstance(), tableName);
46+
}
47+
}
48+
49+
private static String getPostgresqlSchema(String url) {
50+
/**
51+
* Extract schema for PostgresSQL URL strings which can be of the following formats
52+
* jdbc:postgresql://{host}:{port}/{db}?currentSchema={schema}
53+
* jdbc:postgresql://{host}:{port}/{db}?searchpath={schema}
54+
*/
55+
String dbSchema;
56+
int offset = 0;
57+
int startIndex = url.indexOf("connectionSchema=");
58+
offset = 17;
59+
if (startIndex == -1) {
60+
startIndex = url.indexOf("searchpath=");
61+
offset = 11;
62+
}
63+
64+
int endIndex = url.indexOf("&", startIndex);
65+
if (endIndex == -1) {
66+
endIndex = url.length();
67+
}
68+
if (startIndex != -1 && endIndex != -1 && startIndex <= endIndex) {
69+
dbSchema = url.substring(startIndex + offset, endIndex);
70+
} else {
71+
dbSchema = POSTGRESQL_DEFAULT_SCHEMA;
72+
}
73+
return dbSchema;
74+
}
75+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.util;
18+
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
/**
23+
* Test class for FQNGenerator.
24+
*/
25+
public class FQNGeneratorTest {
26+
27+
@Test
28+
public void testMySQLFQN() {
29+
// Testcases consist of Connection URL, Table Name, Expected FQN String
30+
String[][] testCases = {{"jdbc:mysql://localhost:1111/db", "table1", "mysql://localhost:1111/db.table1"},
31+
{"jdbc:mysql://34.35.36.37/db?useSSL=false", "table2",
32+
"mysql://34.35.36.37:3306/db.table2"}};
33+
for (int i = 0; i < testCases.length; i++) {
34+
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
35+
Assert.assertEquals(testCases[i][2], fqn);
36+
}
37+
}
38+
39+
@Test
40+
public void testSQLServerFQN() {
41+
// Testcases consist of Connection URL, Table Name, Expected FQN String
42+
String[][] testCases = {{"jdbc:sqlserver://;serverName=127.0.0.1;databaseName=DB",
43+
"table1", "sqlserver://127.0.0.1:1433/DB.table1"},
44+
{"jdbc:sqlserver://localhost:1111;databaseName=DB;encrypt=true;user=user;password=pwd;",
45+
"table2", "sqlserver://localhost:1111/DB.table2"}};
46+
for (int i = 0; i < testCases.length; i++) {
47+
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
48+
Assert.assertEquals(testCases[i][2], fqn);
49+
}
50+
}
51+
52+
@Test
53+
public void testOracleFQN() {
54+
// Testcases consist of Connection URL, Table Name, Expected FQN String
55+
String[][] testCases = {{"jdbc:oracle:thin:@localhost:db", "table1", "oracle://localhost:1521/db.table1"},
56+
{"jdbc:oracle:thin:@test.server:1111/db",
57+
"table2", "oracle://test.server:1111/db.table2"}};
58+
for (int i = 0; i < testCases.length; i++) {
59+
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
60+
Assert.assertEquals(testCases[i][2], fqn);
61+
}
62+
}
63+
64+
@Test
65+
public void testPostgresqlFQN() {
66+
// Testcases consist of Connection URL, Table Name, Expected FQN String
67+
String[][] testCases = {{"jdbc:postgresql://34.35.36.37/test?user=user&password=secret&ssl=true",
68+
"table1", "postgresql://34.35.36.37:5432/test.public.table1"},
69+
{"jdbc:postgresql://localhost/test?connectionSchema=schema",
70+
"table2", "postgresql://localhost:5432/test.schema.table2"},
71+
{"jdbc:postgresql://localhost/test?searchpath=schema",
72+
"table3", "postgresql://localhost:5432/test.schema.table3"}};
73+
for (int i = 0; i < testCases.length; i++) {
74+
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
75+
Assert.assertEquals(testCases[i][2], fqn);
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)