Skip to content

Commit 3379616

Browse files
Fixed the timestamp handling to DateTime (#397)
1 parent 79898e2 commit 3379616

File tree

7 files changed

+177
-4
lines changed

7 files changed

+177
-4
lines changed

database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
public final class DBUtils {
6060
private static final Logger LOG = LoggerFactory.getLogger(DBUtils.class);
6161

62-
private static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender();
62+
public static final Calendar PURE_GREGORIAN_CALENDAR = createPureGregorianCalender();
6363

6464
// Java by default uses October 15, 1582 as a Gregorian cut over date.
6565
// Any timestamp created with time less than this cut over date is treated as Julian date.

postgresql-plugin/src/e2e-test/resources/pluginParameters.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ datatypesSchema=[{"key":"id","value":"string"},{"key":"col1","value":"string"},{
4242
{"key":"col10","value":"decimal"},{"key":"col11","value":"decimal"},{"key":"col12","value":"float"},\
4343
{"key":"col13","value":"double"},{"key":"col14","value":"string"},{"key":"col15","value":"string"},\
4444
{"key":"col16","value":"string"},{"key":"col17","value":"double"},{"key":"col18","value":"decimal"},\
45-
{"key":"col22","value":"timestamp"},{"key":"col23","value":"timestamp"},{"key":"col24","value":"time"},\
45+
{"key":"col22","value":"datetime"},{"key":"col23","value":"timestamp"},{"key":"col24","value":"time"},\
4646
{"key":"col25","value":"string"},{"key":"col26","value":"string"},{"key":"col27","value":"date"},\
4747
{"key":"col28","value":"string"},{"key":"col29","value":"string"},{"key":"col30","value":"string"},\
4848
{"key":"col31","value":"string"},{"key":"col32","value":"string"},{"key":"col33","value":"string"},\

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,19 @@
2222
import io.cdap.plugin.db.DBRecord;
2323
import io.cdap.plugin.db.Operation;
2424
import io.cdap.plugin.db.SchemaReader;
25+
import io.cdap.plugin.util.DBUtils;
2526

2627
import java.lang.reflect.InvocationTargetException;
2728
import java.lang.reflect.Method;
2829
import java.sql.PreparedStatement;
2930
import java.sql.ResultSet;
3031
import java.sql.ResultSetMetaData;
3132
import java.sql.SQLException;
33+
import java.sql.Timestamp;
34+
import java.sql.Types;
35+
import java.time.OffsetDateTime;
36+
import java.time.ZoneId;
37+
import java.time.ZonedDateTime;
3238
import java.util.List;
3339

3440
/**
@@ -51,13 +57,46 @@ public PostgresDBRecord() {
5157
@Override
5258
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
5359
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
60+
String columnTypeName = resultSet.getMetaData().getColumnTypeName(columnIndex);
5461
if (isUseSchema(resultSet.getMetaData(), columnIndex)) {
5562
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
63+
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
64+
Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR);
65+
if (timestamp != null) {
66+
ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset())
67+
.atZoneSameInstant(ZoneId.of("UTC"));
68+
Schema nonNullableSchema = field.getSchema().isNullable() ?
69+
field.getSchema().getNonNullable() : field.getSchema();
70+
setZonedDateTimeBasedOnOuputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
71+
field.getName(), zonedDateTime);
72+
} else {
73+
recordBuilder.set(field.getName(), null);
74+
}
75+
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
76+
OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class);
77+
if (timestamp != null) {
78+
recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC")));
79+
} else {
80+
recordBuilder.set(field.getName(), null);
81+
}
5682
} else {
5783
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
5884
}
5985
}
6086

87+
private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordBuilder,
88+
Schema.LogicalType logicalType,
89+
String fieldName,
90+
ZonedDateTime zonedDateTime) {
91+
if (Schema.LogicalType.DATETIME.equals(logicalType)) {
92+
recordBuilder.setDateTime(fieldName, zonedDateTime.toLocalDateTime());
93+
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(logicalType)) {
94+
recordBuilder.setTimestamp(fieldName, zonedDateTime);
95+
}
96+
97+
return;
98+
}
99+
61100
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
62101
switch (metadata.getColumnTypeName(columnIndex)) {
63102
case "bit":

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public class PostgresFieldsValidator extends CommonFieldsValidator {
3030

3131
@Override
3232
public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, int index) throws SQLException {
33-
Schema.Type fieldType = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType()
34-
: field.getSchema().getType();
33+
Schema schema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
34+
Schema.Type fieldType = schema.getType();
3535

3636
String colTypeName = metadata.getColumnTypeName(index);
3737
int columnType = metadata.getColumnType(index);
@@ -46,6 +46,11 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4646
}
4747
}
4848

49+
if (colTypeName.equalsIgnoreCase("timestamp")
50+
&& schema.getLogicalType().equals(Schema.LogicalType.DATETIME)) {
51+
return true;
52+
}
53+
4954
return super.isFieldCompatible(field, metadata, index);
5055
}
5156
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
5858
return Schema.of(Schema.Type.STRING);
5959
}
6060

61+
if (typeName.equalsIgnoreCase("timestamp")) {
62+
return Schema.of(Schema.LogicalType.DATETIME);
63+
}
64+
6165
return super.getSchema(metadata, index);
6266
}
6367

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.api.annotation.MetadataProperty;
2424
import io.cdap.cdap.api.annotation.Name;
2525
import io.cdap.cdap.api.annotation.Plugin;
26+
import io.cdap.cdap.api.data.schema.Schema;
2627
import io.cdap.cdap.etl.api.FailureCollector;
2728
import io.cdap.cdap.etl.api.batch.BatchSource;
2829
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
@@ -137,5 +138,20 @@ public void validate(FailureCollector collector) {
137138
ConfigUtil.validateConnection(this, useConnection, connection, collector);
138139
super.validate(collector);
139140
}
141+
142+
@Override
143+
protected void validateField(FailureCollector collector, Schema.Field field,
144+
Schema actualFieldSchema, Schema expectedFieldSchema) {
145+
// This change is needed to make sure that the pipeline upgrade continues to work post upgrade.
146+
// Since the older handling of the Timestamp used to convert to CDAP TIMESTAMP type,
147+
// but since PostgreSQL Timestamp does not have a timezone information, hence it should ideally map to
148+
// CDAP DATETIME type. In that case the output schema would be set to TIMESTAMP,
149+
// and the code internally would try to identify the schema of the field as DATETIME.
150+
if (Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())
151+
&& Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType())) {
152+
return;
153+
}
154+
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);
155+
}
140156
}
141157
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.postgres;
18+
19+
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.plugin.util.DBUtils;
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.mockito.Mockito;
26+
import org.mockito.junit.MockitoJUnitRunner;
27+
28+
import java.sql.ResultSet;
29+
import java.sql.ResultSetMetaData;
30+
import java.sql.SQLException;
31+
import java.sql.Timestamp;
32+
import java.sql.Types;
33+
import java.time.OffsetDateTime;
34+
import java.time.ZoneId;
35+
import java.time.ZoneOffset;
36+
37+
import static org.mockito.ArgumentMatchers.eq;
38+
import static org.mockito.Mockito.when;
39+
40+
/**
41+
* Unit Test class for the PostgresDBRecord
42+
*/
43+
@RunWith(MockitoJUnitRunner.class)
44+
public class PostgresDBRecordUnitTest {
45+
@Test
46+
public void validateTimestampType() throws SQLException {
47+
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
48+
ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
49+
when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamp");
50+
51+
ResultSet resultSet = Mockito.mock(ResultSet.class);
52+
when(resultSet.getMetaData()).thenReturn(metaData);
53+
when(resultSet.getTimestamp(eq(0), eq(DBUtils.PURE_GREGORIAN_CALENDAR)))
54+
.thenReturn(Timestamp.from(offsetDateTime.toInstant()));
55+
56+
Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.DATETIME));
57+
Schema schema = Schema.recordOf(
58+
"dbRecord",
59+
field1
60+
);
61+
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
62+
63+
PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null);
64+
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
65+
StructuredRecord record = builder.build();
66+
Assert.assertNotNull(record);
67+
Assert.assertNotNull(record.getDateTime("field1"));
68+
Assert.assertEquals(record.getDateTime("field1").toInstant(ZoneOffset.UTC), offsetDateTime.toInstant());
69+
70+
// Validate backward compatibility
71+
72+
field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
73+
schema = Schema.recordOf(
74+
"dbRecord",
75+
field1
76+
);
77+
builder = StructuredRecord.builder(schema);
78+
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
79+
record = builder.build();
80+
Assert.assertNotNull(record);
81+
Assert.assertNotNull(record.getTimestamp("field1"));
82+
Assert.assertEquals(record.getTimestamp("field1").toInstant(), offsetDateTime.toInstant());
83+
}
84+
85+
@Test
86+
public void validateTimestampTZType() throws SQLException {
87+
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);
88+
ResultSetMetaData metaData = Mockito.mock(ResultSetMetaData.class);
89+
when(metaData.getColumnTypeName(eq(0))).thenReturn("timestamptz");
90+
91+
ResultSet resultSet = Mockito.mock(ResultSet.class);
92+
when(resultSet.getMetaData()).thenReturn(metaData);
93+
when(resultSet.getObject(eq(0), eq(OffsetDateTime.class))).thenReturn(offsetDateTime);
94+
95+
Schema.Field field1 = Schema.Field.of("field1", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
96+
Schema schema = Schema.recordOf(
97+
"dbRecord",
98+
field1
99+
);
100+
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
101+
102+
PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null);
103+
dbRecord.handleField(resultSet, builder, field1, 0, Types.TIMESTAMP, 0, 0);
104+
StructuredRecord record = builder.build();
105+
Assert.assertNotNull(record);
106+
Assert.assertNotNull(record.getTimestamp("field1", ZoneId.of("UTC")));
107+
Assert.assertEquals(record.getTimestamp("field1", ZoneId.of("UTC")).toInstant(), offsetDateTime.toInstant());
108+
}
109+
}

0 commit comments

Comments
 (0)