33import com .google .cloud .bigquery .TableResult ;
44import com .google .gson .Gson ;
55import com .google .gson .JsonObject ;
6+ import com .google .type .Decimal ;
67import io .cdap .e2e .utils .BigQueryClient ;
78import io .cdap .e2e .utils .PluginPropertyUtils ;
89import io .cdap .plugin .CloudMySqlClient ;
10+ import io .cdap .plugin .common .stepsdesign .TestSetupHooks ;
911import org .junit .Assert ;
10-
12+ import org . junit . Test ;
1113import java .io .IOException ;
1214import java .sql .*;
15+ import java .sql .Date ;
1316import java .text .ParseException ;
1417import java .text .SimpleDateFormat ;
15- import java .util .ArrayList ;
16- import java .util .Base64 ;
17- import java .util .Date ;
18- import java .util .List ;
18+ import java .time .LocalDateTime ;
19+ import java .time .format .DateTimeFormatter ;
20+ import java .util .*;
1921
2022/**
2123 * BQValidation
2224 */
2325
2426public class BQValidation {
25- public static void main (String [] args ) {
26- // validateBQAndDBRecordValues(String schema, String sourceTable, String targetTable)
27+ public static void main (String [] args ) throws SQLException , ParseException , IOException , ClassNotFoundException , InterruptedException {
28+ //TestSetupHooks.createTables();
29+ // TestSetupHooks.createTempSourceBQTable();
30+ // validateBQAndDBRecordValues("SourceTable_qGMpakGOTZ", "mySqlToBQTable");
31+ // validateDBAndBQRecordValues("SourceTable_qGMpakGOTZ", "mySqlToBQTable");
32+ // validateDBAndBQRecordValues("SourceTable_qGMpakGOTZ", "mySqlToBQTable");
33+ validateDBAndBQRecordValues ("E2E_SOURCE_4b406" ,"TargetTable_TRFSpppjsZ" );
2734 }
2835
2936 /**
@@ -33,7 +40,7 @@ public static void main(String[] args) {
3340 * @return true if the values in source and target side are equal
3441 */
3542
36- public static boolean validateBQAndDBRecordValues (String schema , String sourceTable , String targetTable )
43+ public static boolean validateBQAndDBRecordValues (String sourceTable , String targetTable )
3744 throws SQLException , ClassNotFoundException , ParseException , IOException , InterruptedException {
3845 List <JsonObject > jsonResponse = new ArrayList <>();
3946 List <Object > bigQueryRows = new ArrayList <>();
@@ -42,17 +49,44 @@ public static boolean validateBQAndDBRecordValues(String schema, String sourceTa
4249 JsonObject json = new Gson ().fromJson (String .valueOf (rows ), JsonObject .class );
4350 jsonResponse .add (json );
4451 }
45- String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable ;
52+ String getSourceQuery = "SELECT * FROM " + sourceTable ;
4653 try (Connection connect = CloudMySqlClient .getCloudMysqlConnection ()) {
4754 connect .setHoldability (ResultSet .HOLD_CURSORS_OVER_COMMIT );
4855 Statement statement1 = connect .createStatement (ResultSet .TYPE_SCROLL_SENSITIVE , ResultSet .CONCUR_UPDATABLE ,
4956 ResultSet .HOLD_CURSORS_OVER_COMMIT );
50-
5157 ResultSet rsSource = statement1 .executeQuery (getSourceQuery );
5258 return compareResultSetData (rsSource , jsonResponse );
5359 }
5460 }
5561
62+ public static boolean validateDBAndBQRecordValues (String sourceTable , String targetTable )
63+ throws SQLException , ClassNotFoundException , ParseException , IOException , InterruptedException {
64+ List <JsonObject > jsonResponse = new ArrayList <>();
65+ List <Object > bigQueryRows = new ArrayList <>();
66+ getBigQueryTableData (sourceTable , bigQueryRows );
67+ for (Object rows : bigQueryRows ) {
68+ JsonObject json = new Gson ().fromJson (String .valueOf (rows ), JsonObject .class );
69+ jsonResponse .add (json );
70+ }
71+ String getTargetQuery = "SELECT * FROM " + targetTable ;
72+ // String getSourceTimeZoneQuery = "SELECT current_setting('TIMEZONE') AS timezone";
73+ try (Connection connect = CloudMySqlClient .getCloudMysqlConnection ()) {
74+ connect .setHoldability (ResultSet .HOLD_CURSORS_OVER_COMMIT );
75+ Statement statement1 = connect .createStatement (ResultSet .TYPE_SCROLL_SENSITIVE , ResultSet .CONCUR_UPDATABLE ,
76+ ResultSet .HOLD_CURSORS_OVER_COMMIT );
77+ // Statement timeZoneStatement = connect.createStatement();
78+ ResultSet rsTarget = statement1 .executeQuery (getTargetQuery );
79+ // ResultSet rsTimeZone = timeZoneStatement.executeQuery(getSourceTimeZoneQuery);
80+ // String targettz = getTimeZoneIdFromSource(rsTimeZone);
81+ return compareResultSetData (rsTarget , jsonResponse );
82+ }
83+ }
84+
85+
86+
87+
88+
89+
5690 /**
5791 * Retrieves the data from a specified BigQuery table and populates it into the provided list of objects.
5892 *
@@ -68,7 +102,7 @@ private static void getBigQueryTableData(String table, List<Object> bigQueryRows
68102 String dataset = PluginPropertyUtils .pluginProp ("dataset" );
69103 String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + table + "` AS t" ;
70104 TableResult result = BigQueryClient .getQueryResult (selectQuery );
71- result .iterateAll ().forEach (value -> bigQueryRows .add (value .get (0 ).getValue ()));
105+ result .iterateAll ().forEach (value -> bigQueryRows .add (value .get (0 ).getValue ()));
72106 }
73107
74108 /**
@@ -105,17 +139,68 @@ public static boolean compareResultSetData(ResultSet rsSource, List<JsonObject>
105139 //Variable 'jsonObjectIdx' to track the index of the current JsonObject in the bigQueryData list,
106140 int jsonObjectIdx = 0 ;
107141 while (rsSource .next ()) {
108- int currentColumnCount = 1 ;
142+ int currentColumnCount = 2 ;
109143 while (currentColumnCount <= columnCountSource ) {
110144 String columnTypeName = mdSource .getColumnTypeName (currentColumnCount );
111145 int columnType = mdSource .getColumnType (currentColumnCount );
112146 String columnName = mdSource .getColumnName (currentColumnCount );
113147 // Perform different comparisons based on column type
114148 switch (columnType ) {
115149 // Since we skip BFILE in Oracle Sink, we are not comparing the BFILE source and sink values
150+ case Types .BIT :
151+ Boolean sourceBit = rsSource .getBoolean (currentColumnCount );
152+ Boolean targetBit = Boolean .parseBoolean (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ());
153+ Assert .assertTrue ("Different values found for column : %s" ,
154+ String .valueOf (sourceBit ).equals (String .valueOf (targetBit )));
155+ break ;
156+
157+ case Types .SMALLINT :
158+ case Types .INTEGER :
159+ case Types .TINYINT :
160+ Integer sourceTinyInt = rsSource .getInt (currentColumnCount );
161+ Integer targetTinyInt = Integer .parseInt (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ());
162+ Assert .assertTrue ("Different values found for column : %s" ,
163+ String .valueOf (sourceTinyInt ).equals (String .valueOf (targetTinyInt )));
164+ break ;
165+
166+ case Types .REAL :
167+ Float sourceFloat = rsSource .getFloat (currentColumnCount );
168+ Float targetFloat = Float .parseFloat (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ());
169+ Assert .assertTrue ("Different values found for column : %s" ,
170+ String .valueOf (sourceFloat ).equals (String .valueOf (targetFloat )));
171+ break ;
172+
173+ case Types .DOUBLE :
174+ Double sourceDouble = rsSource .getDouble (currentColumnCount );
175+ Double targetDouble = Double .parseDouble (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ());
176+ Assert .assertTrue ("Different values found for column : %s" ,
177+ String .valueOf (sourceDouble ).equals (String .valueOf (targetDouble )));
178+ break ;
179+
180+ case Types .DATE :
181+ Date sourceDate = rsSource .getDate (currentColumnCount );
182+ Date targetDate = java .sql .Date .valueOf (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ());
183+ Assert .assertTrue ("Different values found for column : %s" ,
184+ String .valueOf (sourceDate ).equals (String .valueOf (targetDate )));
185+ break ;
186+
187+ case Types .TIME :
188+ Time sourceTime = rsSource .getTime (currentColumnCount );
189+ Time targetTime = Time .valueOf (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ());
190+ Assert .assertTrue ("Different values found for column : %s" ,
191+ String .valueOf (sourceTime ).equals (String .valueOf (targetTime )));
192+ break ;
193+
194+ case Types .DECIMAL :
195+ org .apache .spark .sql .types .Decimal sourceDecimal = org .apache .spark .sql .types .Decimal .fromDecimal (rsSource .getBigDecimal (currentColumnCount ));
196+ org .apache .spark .sql .types .Decimal targetDecimal = org .apache .spark .sql .types .Decimal .fromDecimal (bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsBigDecimal ());
197+ Assert .assertEquals ("Different values found for column : %s" , sourceDecimal , targetDecimal );
198+ break ;
199+
116200 case Types .BLOB :
117201 case Types .VARBINARY :
118202 case Types .LONGVARBINARY :
203+ case Types .BINARY :
119204 String sourceB64String = new String (Base64 .getEncoder ().encode (rsSource .getBytes (currentColumnCount )));
120205 String targetB64String = bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ();
121206 Assert .assertEquals ("Different values found for column : %s" ,
@@ -130,18 +215,20 @@ public static boolean compareResultSetData(ResultSet rsSource, List<JsonObject>
130215 break ;
131216
132217 case Types .TIMESTAMP :
133- Timestamp sourceTS = rsSource .getTimestamp (columnName );
134- SimpleDateFormat dateFormat = new SimpleDateFormat ( "yyyy-MM-dd'T'hh:mm:ss'Z'" );
135- Date parsedDate = dateFormat .parse (bigQueryData . get ( jsonObjectIdx ). get ( columnName ). getAsString () );
136- Timestamp targetTs = new Timestamp ( parsedDate . getTime () );
137- Assert . assertEquals ( "Different values found for column : %s" , String . valueOf ( sourceTS ).
138- equals ( String . valueOf ( targetTs )) );
218+ String sourceTS = String . valueOf ( rsSource .getTimestamp (currentColumnCount ) );
219+ String targetTS = bigQueryData . get ( jsonObjectIdx ). get ( columnName ). getAsString ( );
220+ LocalDateTime timestamp = LocalDateTime .parse (targetTS , DateTimeFormatter . ISO_DATE_TIME );
221+ DateTimeFormatter formatter = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss.S" );
222+ String formattedTimestamp = timestamp . format ( formatter );
223+ Assert . assertEquals ( sourceTS , formattedTimestamp );
139224 break ;
225+
140226 default :
141227 String sourceString = rsSource .getString (currentColumnCount );
142228 String targetString = bigQueryData .get (jsonObjectIdx ).get (columnName ).getAsString ();
143229 Assert .assertEquals (String .format ("Different %s values found for column : %s" , columnTypeName , columnName ),
144230 String .valueOf (sourceString ), String .valueOf (targetString ));
231+ break ;
145232 }
146233 currentColumnCount ++;
147234 }
@@ -151,4 +238,33 @@ public static boolean compareResultSetData(ResultSet rsSource, List<JsonObject>
151238 rsSource .next ());
152239 return true ;
153240 }
241+
242+
243+ private static void getTimestampValidation (ResultSet rsSource , String sourceTimeZone , String columnTypeName , String
244+ columnName , int currentColumnCount , JsonObject bigQueryDataObj ) throws
245+ SQLException , ParseException {
246+ TimeZone sourceTz ;
247+ if (columnTypeName .equals ("timestamp" )) {
248+ sourceTz = TimeZone .getTimeZone ("UTC" ); //Assume UTC since plugin assumes UTC
249+ } else if (columnTypeName .equals ("timestamptz" )) {
250+ sourceTz = TimeZone .getTimeZone (sourceTimeZone ); //Get TZ From Postgres
251+ } else
252+ return ;
253+ GregorianCalendar gc = new GregorianCalendar (sourceTz );
254+ gc .setGregorianChange (new Date (Long .MIN_VALUE ));
255+ Timestamp sourceTS = rsSource .getTimestamp (currentColumnCount , gc );
256+ TimeZone targetTz = TimeZone .getTimeZone (
257+ "UTC" ); //BQ Always Stores in UTC, if timezone is available it converts to UTC, if it is not available it assumes it to be UTC
258+ SimpleDateFormat dateFormat = new SimpleDateFormat ("yyyy-MM-dd'T'HH:mm:ss'Z'" );
259+ dateFormat .setTimeZone (targetTz );
260+ Date parsedDate = (Date ) dateFormat .parse (bigQueryDataObj .get (columnName ).getAsString ());
261+ Timestamp targetTs = new Timestamp (parsedDate .getTime ());
262+ Assert .assertEquals (sourceTS , targetTs );
263+ }
264+ private static String getTimeZoneIdFromSource (ResultSet rsTimezone ) throws SQLException {
265+ if (rsTimezone .next ()) {
266+ return rsTimezone .getString ("timezone" );
267+ }
268+ return "UTC" ; //If query fails return UTC
269+ }
154270}
0 commit comments