Skip to content

Commit

Permalink
Added fix for date datatype
Browse files Browse the repository at this point in the history
Added fix for date datatype

Added fix for date datatype
  • Loading branch information
vikasrathee-cs committed Oct 17, 2024
1 parent a83eaac commit a99f062
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 8 deletions.
51 changes: 51 additions & 0 deletions oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,54 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table with case


@BQ_SOURCE_TEST_DATE @ORACLE_DATE_TABLE
Scenario: To verify data is getting transferred from BigQuery source to Oracle sink successfully when schema is having date and timestamp fields
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Oracle" from the plugins list as: "Sink"
Then Connect plugins: "BigQuery" and "Oracle" to establish connection
Then Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Verify the Output Schema matches the Expected Schema: "outputDatatypesDateTimeSchema"
Then Validate "BigQuery" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Oracle"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Enter input plugin property: "referenceName" with value: "sourceRef"
Then Replace input plugin property: "database" with value: "databaseName"
Then Replace input plugin property: "tableName" with value: "targetTable"
Then Replace input plugin property: "dbSchemaName" with value: "schema"
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "targetRef"
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Validate "Oracle" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Verify the preview of pipeline is "success"
Then Click on preview data for Oracle sink
Then Close the preview data
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table
48 changes: 41 additions & 7 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.List;
Expand All @@ -44,6 +49,13 @@

public class BQValidation {

private static List<SimpleDateFormat> TIMESTAMP_DATE_FORMATS = Arrays.asList(
new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss"),
new SimpleDateFormat("yyyy-MM-dd"));
private static List<DateTimeFormatter> TIMESTAMP_TZ_DATE_FORMATS = Arrays.asList(
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"));

/**
* Extracts entire data from source and target tables.
*
Expand Down Expand Up @@ -173,21 +185,43 @@ public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonO

case Types.TIMESTAMP:
Timestamp sourceTS = rsSource.getTimestamp(columnName);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss");
String targetT = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date dateParsed = dateFormat.parse(targetT);
Date dateParsed = null;
for (SimpleDateFormat dateTimeFormatter : TIMESTAMP_DATE_FORMATS) {
try {
dateParsed = dateTimeFormatter.parse(targetT);
break;
} catch (ParseException exception) {
// do nothing
}
}
Timestamp targetTs = new java.sql.Timestamp(dateParsed.getTime());
result = String.valueOf(sourceTS).equals(String.valueOf(targetTs));
result = sourceTS.equals(targetTs);
Assert.assertTrue("Different values found for column : %s", result);
break;

case OracleSourceSchemaReader.TIMESTAMP_TZ:
Timestamp sourceTZ = rsSource.getTimestamp(columnName);
SimpleDateFormat dateValue = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
String targetTS = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date date = dateValue.parse(targetTS);
Timestamp targetTZ = new Timestamp(date.getTime());
Assert.assertTrue("Different columns found for Timestamp", sourceTZ.equals(targetTZ));
ZonedDateTime targetDate = null;
for (DateTimeFormatter dateTimeFormatter : TIMESTAMP_TZ_DATE_FORMATS) {
try {
targetDate = ZonedDateTime.parse(targetTS, dateTimeFormatter);
break;
} catch (DateTimeParseException exception) {
// do nothing
}
}
Assert.assertTrue("Different columns found for Timestamp",
sourceTZ.toLocalDateTime().equals(targetDate.toLocalDateTime()));
break;

case OracleSourceSchemaReader.TIMESTAMP_LTZ:
Timestamp sourceLTZ = rsSource.getTimestamp(columnName);
String targetLTZ = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Assert.assertTrue("Different columns found for Timestamp",
sourceLTZ.toLocalDateTime().equals(LocalDateTime.parse(targetLTZ, formatter)));
break;

case OracleSourceSchemaReader.BINARY_FLOAT:
Expand Down
10 changes: 10 additions & 0 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/OracleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,14 @@ public static void deleteTable(String schema, String table)
}
}
}

public static void createTargetDateTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable +
"(ID varchar2(100),DATE_COL DATE,TIMESTAMP_TZ_COL TIMESTAMP WITH TIME ZONE,TIMESTAMP_LTZ_COL " +
"TIMESTAMP WITH LOCAL TIME ZONE,INTERVAL_YM_COL INTERVAL YEAR TO MONTH,DATE_TYPE DATE)";
statement.executeUpdate(createTargetTableQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,34 @@ public static void deleteTempSourceBQTableSmallCase() throws IOException, Interr
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void createTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("CreateBQTableQueryFileDate"),
PluginPropertyUtils.pluginProp("InsertBQDataQueryFileDate"));
}

@After(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void deleteTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
String bqSourceTable = PluginPropertyUtils.pluginProp("bqSourceTable");
BigQueryClient.dropBqQuery(bqSourceTable);
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 2, value = "@ORACLE_DATE_TABLE")
public static void createOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.createTargetDateTable(PluginPropertyUtils.pluginProp("targetTable"),
PluginPropertyUtils.pluginProp("schema"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " created successfully");
}

@After(order = 2, value = "@ORACLE_DATE_TABLE")
public static void dropOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.deleteTable(PluginPropertyUtils.pluginProp("schema"),
PluginPropertyUtils.pluginProp("targetTable"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " deleted successfully");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ InsertBQDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt
#bq queries file path for Small Case Schema
CreateBQTableQueryFileSmallCase=testdata/BigQuery/BigQueryCreateTableQuerySmallCase.txt
InsertBQDataQueryFileSmallCase=testdata/BigQuery/BigQueryInsertDataQuerySmallCase.txt
#bq queries file path for various Date time type Fields
CreateBQTableQueryFileDate=testdata/BigQuery/CreateBQTableQueryFileDate.txt
InsertBQDataQueryFileDate=testdata/BigQuery/InsertBQDataQueryFileDate.txt

#ORACLE Datatypes
bigQueryColumns=(COL23 FLOAT(4), COL28 TIMESTAMP, COL29 TIMESTAMP(9), COL30 TIMESTAMP WITH TIME ZONE, \
Expand All @@ -125,3 +128,6 @@ outputDatatypesSchema1=[{"key":"COL23","value":"double"},{"key":"COL28","value":
{"key":"COL29","value":"datetime"},{"key":"COL30","value":"timestamp"},{"key":"COL31","value":"string"},\
{"key":"COL32","value":"string"},{"key":"COL33","value":"datetime"},{"key":"COL34","value":"float"},\
{"key":"COL35","value":"double"}]
outputDatatypesDateTimeSchema=[{"key":"ID","value":"string"},{"key":"DATE_COL","value":"datetime"},\
{"key":"TIMESTAMP_TZ_COL","value":"timestamp"},{"key":"TIMESTAMP_LTZ_COL","value":"datetime"},\
{"key":"INTERVAL_YM_COL","value":"string"},{"key":"DATE_TYPE","value":"date"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TABLE `DATASET.TABLE_NAME` (ID STRING NOT NULL,DATE_COL DATETIME, TIMESTAMP_TZ_COL TIMESTAMP, TIMESTAMP_LTZ_COL DATETIME, INTERVAL_YM_COL STRING,DATE_TYPE DATE);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO `DATASET.TABLE_NAME` (id, date_col, timestamp_tz_col, timestamp_ltz_col, interval_ym_col,date_type) VALUES('2', '2024-10-11', '2024-10-11 14:30:00.123456+00:00', '2024-10-11 14:30:00.123456','2-6','2024-10-11');
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String timestampString = Timestamp.valueOf(localDateTime).toString();
Object timestampWithTimeZone = createOracleTimestamp(stmt.getConnection(), timestampString);
stmt.setObject(sqlIndex, timestampWithTimeZone);
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) {
} else {
// Deprecated: Handle the case when the Timestamp is mapped to CDAP Timestamp type
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
}
Expand Down

0 comments on commit a99f062

Please sign in to comment.