Skip to content

Commit 66bf1ba

Browse files
Added Support for proper handling of Timestamp, TimestampLTZ, TimestampTZ types (#386)
ORACLE Timestamp : CDAP Datetime ORACLE TimestampLTZ : CDAP Datetime ORACLE TimestampTZ : CDAP Timestamp
1 parent 1fa9919 commit 66bf1ba

File tree

10 files changed

+376
-30
lines changed

10 files changed

+376
-30
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#
2+
# Copyright © 2023 Cask Data, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
# use this file except in compliance with the License. You may obtain a copy of
6+
# the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations under
14+
# the License.
15+
#
16+
17+
@Oracle
18+
Feature: Oracle - Verify Oracle source data transfer for all Timestamp types
19+
@ORACLE_SOURCE_DATATYPE_TIMESTAMP @ORACLE_SINK_TEST @Oracle_Required
20+
Scenario: To verify data is getting transferred from Oracle to Oracle successfully
21+
Given Open Datafusion Project to configure pipeline
22+
When Expand Plugin group in the LHS plugins list: "Source"
23+
When Select plugin: "Oracle" from the plugins list as: "Source"
24+
When Expand Plugin group in the LHS plugins list: "Sink"
25+
When Select plugin: "Oracle" from the plugins list as: "Sink"
26+
Then Connect plugins: "Oracle" and "Oracle2" to establish connection
27+
Then Navigate to the properties page of plugin: "Oracle"
28+
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
29+
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
30+
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
31+
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
32+
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
33+
Then Select radio button plugin property: "connectionType" with value: "service"
34+
Then Select radio button plugin property: "role" with value: "sysdba"
35+
Then Enter input plugin property: "referenceName" with value: "sourceRef"
36+
Then Replace input plugin property: "database" with value: "databaseName"
37+
Then Enter textarea plugin property: "importQuery" with value: "selectQuery"
38+
Then Click on the Get Schema button
39+
Then Verify the Output Schema matches the Expected Schema: "outputTimestampDatatypesSchema"
40+
Then Validate "Oracle" plugin properties
41+
Then Close the Plugin Properties page
42+
Then Navigate to the properties page of plugin: "Oracle2"
43+
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
44+
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
45+
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
46+
Then Replace input plugin property: "database" with value: "databaseName"
47+
Then Replace input plugin property: "tableName" with value: "targetTable"
48+
Then Replace input plugin property: "dbSchemaName" with value: "schema"
49+
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
50+
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
51+
Then Enter input plugin property: "referenceName" with value: "targetRef"
52+
Then Select radio button plugin property: "connectionType" with value: "service"
53+
Then Select radio button plugin property: "role" with value: "sysdba"
54+
Then Validate "Oracle2" plugin properties
55+
Then Close the Plugin Properties page
56+
Then Save the pipeline
57+
Then Preview and run the pipeline
58+
Then Verify the preview of pipeline is "success"
59+
Then Click on preview data for Oracle sink
60+
Then Verify preview output schema matches the outputSchema captured in properties
61+
Then Close the preview data
62+
Then Deploy the pipeline
63+
Then Run the Pipeline in Runtime
64+
Then Wait till pipeline is in running state
65+
Then Open and capture logs
66+
Then Verify the pipeline status is "Succeeded"
67+
Then Validate the values of records transferred to target table is equal to the values from source table

oracle-plugin/src/e2e-test/java/io.cdap.plugin/OracleClient.java

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package io.cdap.plugin;
1818

19+
import com.google.common.base.Strings;
1920
import io.cdap.e2e.utils.PluginPropertyUtils;
2021
import io.cdap.plugin.oracle.OracleSourceSchemaReader;
2122
import org.junit.Assert;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2225

2326
import java.sql.Blob;
2427
import java.sql.Clob;
@@ -30,6 +33,7 @@
3033
import java.sql.Statement;
3134
import java.sql.Timestamp;
3235
import java.sql.Types;
36+
import java.time.Instant;
3337
import java.util.Arrays;
3438
import java.util.Date;
3539
import java.util.GregorianCalendar;
@@ -112,30 +116,51 @@ private static boolean compareResultSetData(ResultSet rsSource, ResultSet rsTarg
112116
byte[] sourceArrayBlob = blobSource.getBytes(1, (int) blobSource.length());
113117
Blob blobTarget = rsTarget.getBlob(currentColumnCount);
114118
byte[] targetArrayBlob = blobTarget.getBytes(1, (int) blobTarget.length());
115-
Assert.assertTrue(String.format("Different values found for column : %s", columnName),
119+
Assert.assertTrue(String.format("Different BLOB values found for column : %s", columnName),
116120
Arrays.equals(sourceArrayBlob, targetArrayBlob));
117121
break;
118122
case Types.CLOB:
119123
Clob clobSource = rsSource.getClob(currentColumnCount);
120124
String sourceClobString = clobSource.getSubString(1, (int) clobSource.length());
121125
Clob clobTarget = rsTarget.getClob(currentColumnCount);
122126
String targetClobString = clobTarget.getSubString(1, (int) clobTarget.length());
123-
Assert.assertTrue(String.format("Different values found for column : %s", columnName),
124-
sourceClobString.equals(targetClobString));
127+
Assert.assertEquals(String.format("Different CLOB values found for column : %s", columnName),
128+
sourceClobString, targetClobString);
125129
break;
126130
case Types.TIMESTAMP:
127131
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
128132
gc.setGregorianChange(new Date(Long.MIN_VALUE));
129133
Timestamp sourceTS = rsSource.getTimestamp(currentColumnCount, gc);
130134
Timestamp targetTS = rsTarget.getTimestamp(currentColumnCount, gc);
131-
Assert.assertTrue(String.format("Different values found for column : %s", columnName),
132-
sourceTS.equals(targetTS));
135+
Assert.assertEquals(String.format("Different TIMESTAMP values found for column : %s", columnName),
136+
sourceTS, targetTS);
137+
break;
138+
case OracleSourceSchemaReader.TIMESTAMP_TZ:
139+
// The timezone information in the field is lost during pipeline execution hence it is required to
140+
// convert both values into the system timezone and then compare.
141+
GregorianCalendar gregorianCalendar = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
142+
gregorianCalendar.setGregorianChange(new Date(Long.MIN_VALUE));
143+
Timestamp tsSource = rsSource.getTimestamp(currentColumnCount, gregorianCalendar);
144+
Timestamp tsTarget = rsTarget.getTimestamp(currentColumnCount, gregorianCalendar);
145+
if (tsSource == null && tsTarget == null) {
146+
break;
147+
}
148+
Assert.assertNotNull(
149+
String.format("Column : %s is null in source table and is not Null in target table.", columnName),
150+
tsSource);
151+
Assert.assertNotNull(
152+
String.format("Column : %s is null in target table and is not Null in source table.", columnName),
153+
tsTarget);
154+
Instant sourceInstant = tsSource.toInstant();
155+
Instant targetInstant = tsTarget.toInstant();
156+
Assert.assertEquals(String.format("Different TIMESTAMPTZ values found for column : %s", columnName),
157+
sourceInstant, targetInstant);
133158
break;
134159
default:
135160
String sourceString = rsSource.getString(currentColumnCount);
136161
String targetString = rsTarget.getString(currentColumnCount);
137-
Assert.assertTrue(String.format("Different values found for column : %s", columnName),
138-
String.valueOf(sourceString).equals(String.valueOf(targetString)));
162+
Assert.assertEquals(String.format("Different %s values found for column : %s", columnTypeName, columnName),
163+
String.valueOf(sourceString), String.valueOf(targetString));
139164
}
140165
currentColumnCount++;
141166
}
@@ -217,6 +242,34 @@ public static void createTargetLongTable(String targetTable, String schema) thro
217242
}
218243
}
219244

245+
public static void createTimestampSourceTable(String sourceTable, String schema) throws SQLException,
246+
ClassNotFoundException {
247+
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {
248+
String timestampColumns = PluginPropertyUtils.pluginProp("timestampColumns");
249+
String createSourceTableQuery = "CREATE TABLE " + schema + "." + sourceTable + " " + timestampColumns;
250+
statement.executeUpdate(createSourceTableQuery);
251+
252+
int rowCount = 1;
253+
while (!Strings.isNullOrEmpty(PluginPropertyUtils.pluginProp("timestampValue" + rowCount))) {
254+
// Insert dummy data.
255+
String timestampValue = PluginPropertyUtils.pluginProp("timestampValue" + rowCount);
256+
String timestampColumnsList = PluginPropertyUtils.pluginProp("timestampColumnsList");
257+
statement.executeUpdate("INSERT INTO " + schema + "." + sourceTable + " " + timestampColumnsList + " " +
258+
timestampValue);
259+
rowCount++;
260+
}
261+
}
262+
}
263+
264+
public static void createTimestampTargetTable(String targetTable, String schema) throws SQLException,
265+
ClassNotFoundException {
266+
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {
267+
String timestampColumns = PluginPropertyUtils.pluginProp("timestampColumns");
268+
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable + " " + timestampColumns;
269+
statement.executeUpdate(createTargetTableQuery);
270+
}
271+
}
272+
220273
public static void createSourceLongRawTable(String sourceTable, String schema) throws SQLException,
221274
ClassNotFoundException {
222275
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {

oracle-plugin/src/e2e-test/java/io.cdap.plugin/common.stepsdesign/TestSetupHooks.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ public static void createLongVarcharTables() throws SQLException, ClassNotFoundE
8181
PluginPropertyUtils.pluginProp("schema"));
8282
}
8383

84+
@Before(order = 2, value = "@ORACLE_SOURCE_DATATYPE_TIMESTAMP")
85+
public static void createTimestampDatatypeTables() throws SQLException, ClassNotFoundException {
86+
OracleClient.createTimestampSourceTable(PluginPropertyUtils.pluginProp("sourceTable"),
87+
PluginPropertyUtils.pluginProp("schema"));
88+
OracleClient.createTimestampTargetTable(PluginPropertyUtils.pluginProp("targetTable"),
89+
PluginPropertyUtils.pluginProp("schema"));
90+
}
91+
8492
@After(order = 1, value = "@ORACLE_SINK_TEST")
8593
public static void dropTables() throws SQLException, ClassNotFoundException {
8694
OracleClient.deleteTables(PluginPropertyUtils.pluginProp("schema"),

oracle-plugin/src/e2e-test/java/io.cdap.plugin/oracle/stepsdesign/Oracle.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import org.junit.Assert;
2424
import stepsdesign.BeforeActions;
2525

26-
import java.sql.Connection;
2726
import java.sql.SQLException;
28-
import java.sql.Statement;
2927

3028
/**
3129
* Oracle Plugin related step design.

oracle-plugin/src/e2e-test/resources/pluginParameters.properties

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ outputDatatypesSchema=[{"key":"ID","value":"string"},{"key":"COL1","value":"stri
3434
{"key":"COL18","value":"decimal"},{"key":"COL19","value":"decimal"},{"key":"COL20","value":"decimal"},\
3535
{"key":"COL21","value":"decimal"},{"key":"COL22","value":"double"},{"key":"COL23","value":"double"},\
3636
{"key":"COL24","value":"decimal"},{"key":"COL25","value":"double"},{"key":"COL26","value":"double"},\
37-
{"key":"COL27","value":"decimal"},{"key":"COL28","value":"timestamp"},{"key":"COL29","value":"timestamp"},\
38-
{"key":"COL30","value":"string"},{"key":"COL31","value":"string"},{"key":"COL32","value":"string"},\
39-
{"key":"COL33","value":"timestamp"},{"key":"COL34","value":"float"},{"key":"COL35","value":"double"}]
37+
{"key":"COL27","value":"decimal"},{"key":"COL28","value":"datetime"},{"key":"COL29","value":"datetime"},\
38+
{"key":"COL30","value":"timestamp"},{"key":"COL31","value":"string"},{"key":"COL32","value":"string"},\
39+
{"key":"COL33","value":"datetime"},{"key":"COL34","value":"float"},{"key":"COL35","value":"double"}]
4040

4141
longColumns=(ID VARCHAR2(100) PRIMARY KEY, COL1 LONG, COL2 RAW(2), COL3 BLOB, COL4 CLOB, COL5 NCLOB, COL6 BFILE)
4242
longColumnsList=(ID,COL1,COL2,COL3,COL4,COL5,COL6)
@@ -55,3 +55,21 @@ longVarcharColumns=(ID VARCHAR2(100) PRIMARY KEY, COL1 LONG VARCHAR)
5555
longVarcharColumnsList=(ID,COL1)
5656
longVarcharValues=VALUES ('User1','48692054686572652120486F772061726520796F75206665656C696E67AbCdEF646179203F')
5757
outputDatatypesSchema4=[{"key":"ID","value":"string"},{"key":"COL1","value":"string"}]
58+
59+
timestampColumns=(ID VARCHAR2(100) PRIMARY KEY, COL1 TIMESTAMP, COL2 TIMESTAMP WITH TIME ZONE,\
60+
COL3 TIMESTAMP WITH LOCAL TIME ZONE)
61+
timestampColumnsList=(ID,COL1,COL2,COL3)
62+
timestampValue1=VALUES ('1',TIMESTAMP '2023-01-01 02:00:00.000000',\
63+
TIMESTAMP '2023-01-01 02:00:00.000000 +05:30',TIMESTAMP '2001-12-31 13:37:00.000000')
64+
timestampValue2=VALUES ('2',TIMESTAMP '2023-01-01 02:00:00.000000',NULL,NULL)
65+
timestampValue3=VALUES ('3',TIMESTAMP '0001-01-01 09:00:00.000000',\
66+
TIMESTAMP '0001-01-01 01:00:00.000000 -08:00',TIMESTAMP '0001-01-01 09:00:00.000000')
67+
timestampValue4=VALUES ('4',TIMESTAMP '0001-01-01 01:00:00.000000',\
68+
TIMESTAMP '0001-01-02 01:00:00.000000 -08:00',TIMESTAMP '2022-12-31 13:37:00.000000')
69+
timestampValue5=VALUES ('5',TIMESTAMP '2023-01-01 01:00:00.000000',\
70+
TIMESTAMP '2022-12-31 14:00:00.000000 -08:00',TIMESTAMP '2022-12-31 19:30:00.000000')
71+
timestampValue6=VALUES ('6',NULL,TIMESTAMP '2022-12-31 14:00:00.000000 -08:00',NULL)
72+
timestampValue7=VALUES ('7',NULL,TIMESTAMP '2022-12-31 14:00:00.000000 +05:30',NULL)
73+
timestampValue8=VALUES ('8',TIMESTAMP '0001-01-01 01:00:00.000000',NULL,NULL)
74+
outputTimestampDatatypesSchema=[{"key":"ID","value":"string"},{"key":"COL1","value":"datetime"},\
75+
{"key":"COL2","value":"timestamp"},{"key":"COL3","value":"datetime"}]

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleFieldsValidator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,12 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4747
isSigned);
4848
}
4949
if (fieldLogicalType == Schema.LogicalType.TIMESTAMP_MICROS) {
50-
return sqlType == OracleSinkSchemaReader.TIMESTAMP_LTZ ||
51-
super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
50+
return sqlType == OracleSinkSchemaReader.TIMESTAMP_LTZ
51+
|| sqlType == OracleSinkSchemaReader.TIMESTAMP_TZ
52+
|| super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
53+
} else if (fieldLogicalType == Schema.LogicalType.DATETIME) {
54+
return sqlType == OracleSinkSchemaReader.TIMESTAMP_LTZ
55+
|| sqlType == Types.TIMESTAMP;
5256
} else if (fieldLogicalType != null) {
5357
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
5458
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,21 @@ protected void validateField(FailureCollector collector,
167167
&& actualFieldSchema.getType().equals(Schema.Type.STRING)) {
168168
return;
169169
}
170+
171+
// For handling TimestampTZ types allow if the expected schema is STRING and
172+
// actual schema is set to TIMESTAMP type to ensure backward compatibility.
173+
if (Schema.LogicalType.TIMESTAMP_MICROS.equals(actualFieldSchema.getLogicalType())
174+
&& Schema.Type.STRING.equals(expectedFieldSchema.getType())) {
175+
return;
176+
}
177+
178+
// For handling TimestampLTZ and Timestamp types allow if the expected schema is TIMESTAMP and
179+
// actual schema is set to DATETIME type to ensure backward compatibility.
180+
if (Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType())
181+
&& Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) {
182+
return;
183+
}
184+
170185
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
171186
}
172187
}

0 commit comments

Comments
 (0)