Skip to content

Commit c0969bf

Browse files
committed
fixed smalldatetime and datetimeoffset handling
ensuring backward compatibility addressed comments addressed more comments removed unnecessary code removed shouldconverttodatetime method added getschema in sqlconnector removed getschema added test for datetimeoffset
1 parent 79898e2 commit c0969bf

File tree

8 files changed

+101
-69
lines changed

8 files changed

+101
-69
lines changed

mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin;
1818

19+
import io.cdap.cdap.api.data.schema.Schema;
1920
import io.cdap.e2e.utils.PluginPropertyUtils;
2021
import org.junit.Assert;
2122

@@ -29,6 +30,7 @@
2930
import java.sql.Statement;
3031
import java.sql.Timestamp;
3132
import java.sql.Types;
33+
import java.time.Instant;
3234
import java.util.Arrays;
3335
import java.util.Date;
3436
import java.util.GregorianCalendar;
@@ -227,7 +229,7 @@ private static boolean compareResultSetData(ResultSet rsSource, ResultSet rsTarg
227229
String columnTypeName = mdSource.getColumnTypeName(currentColumnCount);
228230
int columnType = mdSource.getColumnType(currentColumnCount);
229231
String columnName = mdSource.getColumnName(currentColumnCount);
230-
if (columnType == Types.TIMESTAMP) {
232+
if (columnType == Types.TIMESTAMP || columnTypeName.equalsIgnoreCase("datetimeoffset")) {
231233
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
232234
gc.setGregorianChange(new Date(Long.MIN_VALUE));
233235
Timestamp sourceTS = rsSource.getTimestamp(currentColumnCount, gc);

mssql-plugin/src/e2e-test/resources/pluginParameters.properties

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
driverName=sqlserver
2-
databaseName=temp1
2+
databaseName=cdftest
33
sourceRef=source
44
targetRef=target
55
host=MSSQL_HOST
@@ -40,8 +40,9 @@ uniqueIdentifierColumnsList=(ID, COL1)
4040
uniqueIdentifierValues=VALUES ('User1', '6F9619FF-8B86-D011-B42D-00C04FC964FF')
4141
outputDatatypesSchema3=[{"key":"ID","value":"string"},{"key":"COL1","value":"string"}]
4242

43-
dateTimeColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 DATETIME, COL2 DATETIME2(0))
44-
dateTimeColumnsList=(ID, COL1, COL2)
45-
dateTimeValues=VALUES ('User1', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000')
43+
dateTimeColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 DATETIME, COL2 DATETIME2(0), COL3 SMALLDATETIME, COL4 DATETIMEOFFSET)
44+
dateTimeColumnsList=(ID, COL1, COL2, COL3, COL4)
45+
dateTimeValues=VALUES ('User1', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000', \
46+
'2025-12-10 12:32:10.000 +01:00')
4647
outputDatatypesSchema4=[{"key":"ID","value":"string"},{"key":"COL1","value":"datetime"},\
47-
{"key":"COL2","value":"datetime"}]
48+
{"key":"COL2","value":"datetime"},{"key":"COL3","value":"datetime"}, {"key":"COL4","value":"timestamp"}]

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,17 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4141
// Handles datetime datatypes
4242
// Case when Timestamp maps to datetime
4343
// Case when Datetime2 maps to datetime
44-
// Case when DatetimeOffset maps to datetime
4544
if ((sqlType == Types.TIMESTAMP || sqlType == SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE)
46-
&& fieldLogicalType.equals(Schema.LogicalType.DATETIME)) {
45+
&& fieldLogicalType.equals(Schema.LogicalType.DATETIME)) {
4746
return true;
4847
}
48+
49+
// Case when datetimeoffset maps to timestamp
50+
if (sqlType == SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE &&
51+
fieldLogicalType.equals(Schema.LogicalType.TIMESTAMP_MICROS)) {
52+
return true;
53+
}
54+
4955
// Handle logical types first
5056
if (fieldLogicalType != null) {
5157
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
@@ -57,7 +63,8 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
5763
|| sqlType == SqlServerSinkSchemaReader.GEOMETRY_TYPE
5864
|| super.isFieldCompatible(field, metadata, index);
5965
case STRING:
60-
return sqlType == SqlServerSinkSchemaReader.DATETIME_OFFSET_TYPE
66+
return
67+
sqlType == SqlServerSinkSchemaReader.DATETIME_OFFSET_TYPE
6168
// Value of GEOMETRY and GEOGRAPHY type can be set as Well Known Text string such as "POINT(3 40 5 6)"
6269
|| sqlType == SqlServerSinkSchemaReader.GEOGRAPHY_TYPE
6370
|| sqlType == SqlServerSinkSchemaReader.GEOMETRY_TYPE

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,4 @@ protected String getStratifiedQuery(String tableName, int limit, String strata,
149149
limit, sessionID, limit, strata);
150150
}
151151

152-
@Override
153-
protected Schema getSchema(int sqlType, String typeName, int scale, int precision, String columnName,
154-
boolean isSigned, boolean handleAsDecimal) throws SQLException {
155-
if (SqlServerSourceSchemaReader.shouldConvertToDatetime(typeName)) {
156-
return Schema.of(Schema.LogicalType.DATETIME);
157-
}
158-
159-
if (SqlServerSourceSchemaReader.GEOMETRY_TYPE == sqlType || SqlServerSourceSchemaReader.GEOGRAPHY_TYPE == sqlType) {
160-
return Schema.of(Schema.Type.BYTES);
161-
}
162-
return super.getSchema(sqlType, typeName, scale, precision, columnName, isSigned, handleAsDecimal);
163-
}
164152
}

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSinkDBRecord.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import java.sql.PreparedStatement;
2525
import java.sql.SQLException;
26+
import java.sql.Timestamp;
2627
import java.sql.Types;
28+
import java.time.ZonedDateTime;
2729
import java.util.List;
2830

2931
/**
@@ -53,12 +55,23 @@ protected void writeNullToDB(PreparedStatement stmt, int fieldIndex) throws SQLE
5355
@Override
5456
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
5557
String fieldName, int fieldIndex) throws SQLException {
56-
Object fieldValue = (fieldName != null) ? record.get(fieldName) : null;
5758
int sqlType = columnTypes.get(fieldIndex).getType();
59+
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
5860
int sqlIndex = fieldIndex + 1;
61+
if (fieldLogicalType == Schema.LogicalType.TIMESTAMP_MICROS) {
62+
ZonedDateTime timestamp = record.getTimestamp(fieldName);
63+
if (timestamp != null) {
64+
Timestamp localTimestamp = Timestamp.valueOf(timestamp.toLocalDateTime());
65+
stmt.setTimestamp(sqlIndex, localTimestamp);
66+
} else {
67+
stmt.setNull(sqlIndex, sqlType);
68+
}
69+
return;
70+
}
5971
switch (sqlType) {
6072
case SqlServerSourceSchemaReader.GEOGRAPHY_TYPE:
6173
case SqlServerSourceSchemaReader.GEOMETRY_TYPE:
74+
Object fieldValue = (fieldName != null) ? record.get(fieldName) : null;
6275
if (fieldValue instanceof String) {
6376
// Handle setting GEOGRAPHY and GEOMETRY values from Well Known Text.
6477
// For example, "POINT(3 40 5 6)"

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,20 @@ protected void validateField(FailureCollector collector, Schema.Field field, Sch
203203
// map datetimeoffset to timestamp which is invalid. In such case runtime will still fail even validation passes.
204204
// But we don't have the original source type information here and don't want to do big refactoring here
205205
if (actualFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME &&
206-
expectedFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS ||
207-
actualFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME &&
208-
expectedFieldSchema.getType() == Schema.Type.STRING) {
206+
expectedFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS) {
207+
// SmallDateTime case where we map it to CDAP DateTime now as opposed to CDAP Timestamp earlier, as
208+
// SmallDateTime does not contain any TimeZone information
209+
return;
210+
}
211+
if ((actualFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME ||
212+
actualFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS) &&
213+
expectedFieldSchema.getType() == Schema.Type.STRING) {
214+
// Case when user manually sets the type to string
215+
return;
216+
}
217+
if (actualFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS &&
218+
expectedFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME) {
219+
// DateTimeOffset case where we map it to CDAP Timestamp as opposed to CDAP DateTime earlier
209220
return;
210221
}
211222
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSourceDBRecord.java

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
import java.sql.SQLException;
2929
import java.sql.Timestamp;
3030
import java.sql.Types;
31+
import java.time.OffsetDateTime;
32+
import java.time.ZoneId;
33+
import java.time.ZoneOffset;
34+
import java.util.Date;
35+
import java.util.GregorianCalendar;
3136
import java.util.List;
37+
import java.util.TimeZone;
3238

3339
/**
3440
* SQL Server Source implementation {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} and {@link
@@ -52,21 +58,35 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
5258
fieldSchema = fieldSchema.getNonNullable();
5359
}
5460

55-
if (SqlServerSourceSchemaReader.shouldConvertToDatetime(resultSet.getMetaData(), columnIndex) &&
56-
fieldSchema.getLogicalType() == Schema.LogicalType.DATETIME) {
57-
try {
58-
Method getLocalDateTime = resultSet.getClass().getMethod("getDateTime", int.class);
59-
Timestamp value = (Timestamp) getLocalDateTime.invoke(resultSet, columnIndex);
60-
recordBuilder.setDateTime(field.getName(), value == null ? null : value.toLocalDateTime());
61-
return;
62-
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
63-
throw new RuntimeException(String.format("Fail to convert column %s of type %s to datetime. Error: %s.",
64-
resultSet.getMetaData().getColumnName(columnIndex),
65-
resultSet.getMetaData().getColumnTypeName(columnIndex),
66-
e.getMessage()), e);
67-
}
68-
}
6961
switch (sqlType) {
62+
case Types.TIMESTAMP:
63+
// SmallDateTime, DateTime, DateTime2 usecase
64+
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
65+
gc.setGregorianChange(new Date(Long.MIN_VALUE));
66+
Timestamp timestampSmalldatetime = resultSet.getTimestamp(columnIndex, gc);
67+
if (timestampSmalldatetime == null) {
68+
recordBuilder.set(field.getName(), null);
69+
} else if (fieldSchema.getLogicalType() == Schema.LogicalType.DATETIME) {
70+
// SmallDateTime, Datetime, datetime2 to CDAP Datetime type conversion
71+
setDateTime(resultSet, recordBuilder, field, columnIndex);
72+
} else {
73+
// Deprecated use case of supporting SmallDateTime to CDAP Timestamp conversion
74+
recordBuilder.setTimestamp(field.getName(), timestampSmalldatetime.toInstant()
75+
.atZone(ZoneId.ofOffset("UTC", ZoneOffset.UTC)));
76+
}
77+
break;
78+
case SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE:
79+
OffsetDateTime timestampOffset = resultSet.getObject(columnIndex, OffsetDateTime.class);
80+
if (timestampOffset == null) {
81+
recordBuilder.set(field.getName(), null);
82+
} else if (fieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS) {
83+
// DateTimeOffset to CDAP Timestamp type conversion
84+
recordBuilder.setTimestamp(field.getName(), timestampOffset.atZoneSameInstant(ZoneId.of("UTC")));
85+
} else {
86+
// Deprecated use case of supporting DateTimeOffset to CDAP DateTime conversion.
87+
setDateTime(resultSet, recordBuilder, field, columnIndex);
88+
}
89+
break;
7090
case Types.TIME:
7191
// Handle reading SQL Server 'TIME' data type to avoid accuracy loss.
7292
// 'TIME' data type has the accuracy of 100 nanoseconds(1 millisecond in Informatica)
@@ -75,14 +95,25 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
7595
recordBuilder.setTime(field.getName(),
7696
timestamp == null ? null : timestamp.toLocalDateTime().toLocalTime());
7797
break;
78-
case SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE:
79-
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
80-
break;
8198
default:
8299
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
83100
}
84101
}
85102

103+
public void setDateTime(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
104+
int columnIndex) throws SQLException {
105+
try {
106+
Method getLocalDateTime = resultSet.getClass().getMethod("getDateTime", int.class);
107+
Timestamp value = (Timestamp) getLocalDateTime.invoke(resultSet, columnIndex);
108+
recordBuilder.setDateTime(field.getName(), value == null ? null : value.toLocalDateTime());
109+
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
110+
throw new RuntimeException(String.format("Fail to convert column %s of type %s to datetime. Error: %s.",
111+
resultSet.getMetaData().getColumnName(columnIndex),
112+
resultSet.getMetaData().getColumnTypeName(columnIndex),
113+
e.getMessage()), e);
114+
}
115+
}
116+
86117
@Override
87118
protected SchemaReader getSchemaReader() {
88119
return new SqlServerSourceSchemaReader();

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSourceSchemaReader.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.sql.ResultSetMetaData;
2323
import java.sql.SQLException;
24+
import java.sql.Types;
2425

2526
/**
2627
* SQL Server Source schema reader.
@@ -49,41 +50,19 @@ public SqlServerSourceSchemaReader(String sessionID) {
4950
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5051
int columnSqlType = metadata.getColumnType(index);
5152

52-
if (shouldConvertToDatetime(metadata, index)) {
53+
if (DATETIME_OFFSET_TYPE == columnSqlType) {
54+
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
55+
}
56+
if (columnSqlType == Types.TIMESTAMP) {
57+
// SmallDateTime, DateTime and DateTime2 will be mapped to Types.DATETIME
5358
return Schema.of(Schema.LogicalType.DATETIME);
54-
5559
}
5660
if (GEOMETRY_TYPE == columnSqlType || GEOGRAPHY_TYPE == columnSqlType) {
5761
return Schema.of(Schema.Type.BYTES);
5862
}
5963
return super.getSchema(metadata, index);
6064
}
6165

62-
/**
63-
* Whether the corresponding column should be converted to CDAP Datetime Logical Type
64-
* @param metadata result set metadata
65-
* @param index index of the column
66-
* @return whether the corresponding column should be converted to CDAP Datetime Logical Type
67-
* @throws SQLException
68-
*/
69-
public static boolean shouldConvertToDatetime(ResultSetMetaData metadata, int index) throws SQLException {
70-
// datetimeoffset will have type DATETIME_OFFSET_TYPE
71-
// datetime and datetime2 will have type Types.TIMESTAMP
72-
// cannot decide based on sql type
73-
String columnTypeName = metadata.getColumnTypeName(index);
74-
return shouldConvertToDatetime(columnTypeName);
75-
}
76-
77-
/**
78-
* Whether the corresponding data type should be converted to CDAP Datetime Logical Type
79-
* SQL Server data type datetime, datetime2 and datetimeoffset will be converted to CDAP Datetime Logical Type
80-
* @param typeName the data type name
81-
* @return Whether the corresponding data type should be converted to CDAP Datetime Logical Type
82-
*/
83-
public static boolean shouldConvertToDatetime(String typeName) {
84-
return typeName.startsWith(DATETIME_TYPE_PREFIX);
85-
}
86-
8766
@Override
8867
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
8968
if (sessionID == null) {

0 commit comments

Comments
 (0)