Skip to content

Commit 03f40d0

Browse files
authored
Merge pull request #412 from data-integrations/sql_server_datatypes
Fixed smalldatetime and datetimeoffset datatypes handling
2 parents b57f20f + c0969bf commit 03f40d0

File tree

8 files changed

+101
-69
lines changed

8 files changed

+101
-69
lines changed

mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin;
1818

19+
import io.cdap.cdap.api.data.schema.Schema;
1920
import io.cdap.e2e.utils.PluginPropertyUtils;
2021
import org.junit.Assert;
2122

@@ -29,6 +30,7 @@
2930
import java.sql.Statement;
3031
import java.sql.Timestamp;
3132
import java.sql.Types;
33+
import java.time.Instant;
3234
import java.util.Arrays;
3335
import java.util.Date;
3436
import java.util.GregorianCalendar;
@@ -227,7 +229,7 @@ private static boolean compareResultSetData(ResultSet rsSource, ResultSet rsTarg
227229
String columnTypeName = mdSource.getColumnTypeName(currentColumnCount);
228230
int columnType = mdSource.getColumnType(currentColumnCount);
229231
String columnName = mdSource.getColumnName(currentColumnCount);
230-
if (columnType == Types.TIMESTAMP) {
232+
if (columnType == Types.TIMESTAMP || columnTypeName.equalsIgnoreCase("datetimeoffset")) {
231233
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
232234
gc.setGregorianChange(new Date(Long.MIN_VALUE));
233235
Timestamp sourceTS = rsSource.getTimestamp(currentColumnCount, gc);

mssql-plugin/src/e2e-test/resources/pluginParameters.properties

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
driverName=sqlserver
2-
databaseName=temp1
2+
databaseName=cdftest
33
sourceRef=source
44
targetRef=target
55
host=MSSQL_HOST
@@ -40,8 +40,9 @@ uniqueIdentifierColumnsList=(ID, COL1)
4040
uniqueIdentifierValues=VALUES ('User1', '6F9619FF-8B86-D011-B42D-00C04FC964FF')
4141
outputDatatypesSchema3=[{"key":"ID","value":"string"},{"key":"COL1","value":"string"}]
4242

43-
dateTimeColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 DATETIME, COL2 DATETIME2(0))
44-
dateTimeColumnsList=(ID, COL1, COL2)
45-
dateTimeValues=VALUES ('User1', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000')
43+
dateTimeColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 DATETIME, COL2 DATETIME2(0), COL3 SMALLDATETIME, COL4 DATETIMEOFFSET)
44+
dateTimeColumnsList=(ID, COL1, COL2, COL3, COL4)
45+
dateTimeValues=VALUES ('User1', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000', \
46+
'2025-12-10 12:32:10.000 +01:00')
4647
outputDatatypesSchema4=[{"key":"ID","value":"string"},{"key":"COL1","value":"datetime"},\
47-
{"key":"COL2","value":"datetime"}]
48+
{"key":"COL2","value":"datetime"},{"key":"COL3","value":"datetime"}, {"key":"COL4","value":"timestamp"}]

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,17 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4141
// Handles datetime datatypes
4242
// Case when Timestamp maps to datetime
4343
// Case when Datetime2 maps to datetime
44-
// Case when DatetimeOffset maps to datetime
4544
if ((sqlType == Types.TIMESTAMP || sqlType == SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE)
46-
&& fieldLogicalType.equals(Schema.LogicalType.DATETIME)) {
45+
&& fieldLogicalType.equals(Schema.LogicalType.DATETIME)) {
4746
return true;
4847
}
48+
49+
// Case when datetimeoffset maps to timestamp
50+
if (sqlType == SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE &&
51+
fieldLogicalType.equals(Schema.LogicalType.TIMESTAMP_MICROS)) {
52+
return true;
53+
}
54+
4955
// Handle logical types first
5056
if (fieldLogicalType != null) {
5157
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
@@ -57,7 +63,8 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
5763
|| sqlType == SqlServerSinkSchemaReader.GEOMETRY_TYPE
5864
|| super.isFieldCompatible(field, metadata, index);
5965
case STRING:
60-
return sqlType == SqlServerSinkSchemaReader.DATETIME_OFFSET_TYPE
66+
return
67+
sqlType == SqlServerSinkSchemaReader.DATETIME_OFFSET_TYPE
6168
// Value of GEOMETRY and GEOGRAPHY type can be set as Well Known Text string such as "POINT(3 40 5 6)"
6269
|| sqlType == SqlServerSinkSchemaReader.GEOGRAPHY_TYPE
6370
|| sqlType == SqlServerSinkSchemaReader.GEOMETRY_TYPE

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,4 @@ protected String getStratifiedQuery(String tableName, int limit, String strata,
149149
limit, sessionID, limit, strata);
150150
}
151151

152-
@Override
153-
protected Schema getSchema(int sqlType, String typeName, int scale, int precision, String columnName,
154-
boolean isSigned, boolean handleAsDecimal) throws SQLException {
155-
if (SqlServerSourceSchemaReader.shouldConvertToDatetime(typeName)) {
156-
return Schema.of(Schema.LogicalType.DATETIME);
157-
}
158-
159-
if (SqlServerSourceSchemaReader.GEOMETRY_TYPE == sqlType || SqlServerSourceSchemaReader.GEOGRAPHY_TYPE == sqlType) {
160-
return Schema.of(Schema.Type.BYTES);
161-
}
162-
return super.getSchema(sqlType, typeName, scale, precision, columnName, isSigned, handleAsDecimal);
163-
}
164152
}

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSinkDBRecord.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import java.sql.PreparedStatement;
2525
import java.sql.SQLException;
26+
import java.sql.Timestamp;
2627
import java.sql.Types;
28+
import java.time.ZonedDateTime;
2729
import java.util.List;
2830

2931
/**
@@ -53,12 +55,23 @@ protected void writeNullToDB(PreparedStatement stmt, int fieldIndex) throws SQLE
5355
@Override
5456
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
5557
String fieldName, int fieldIndex) throws SQLException {
56-
Object fieldValue = (fieldName != null) ? record.get(fieldName) : null;
5758
int sqlType = columnTypes.get(fieldIndex).getType();
59+
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
5860
int sqlIndex = fieldIndex + 1;
61+
if (fieldLogicalType == Schema.LogicalType.TIMESTAMP_MICROS) {
62+
ZonedDateTime timestamp = record.getTimestamp(fieldName);
63+
if (timestamp != null) {
64+
Timestamp localTimestamp = Timestamp.valueOf(timestamp.toLocalDateTime());
65+
stmt.setTimestamp(sqlIndex, localTimestamp);
66+
} else {
67+
stmt.setNull(sqlIndex, sqlType);
68+
}
69+
return;
70+
}
5971
switch (sqlType) {
6072
case SqlServerSourceSchemaReader.GEOGRAPHY_TYPE:
6173
case SqlServerSourceSchemaReader.GEOMETRY_TYPE:
74+
Object fieldValue = (fieldName != null) ? record.get(fieldName) : null;
6275
if (fieldValue instanceof String) {
6376
// Handle setting GEOGRAPHY and GEOMETRY values from Well Known Text.
6477
// For example, "POINT(3 40 5 6)"

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSource.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,20 @@ protected void validateField(FailureCollector collector, Schema.Field field, Sch
203203
// map datetimeoffset to timestamp which is invalid. In such case runtime will still fail even validation passes.
204204
// But we don't have the original source type information here and don't want to do big refactoring here
205205
if (actualFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME &&
206-
expectedFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS ||
207-
actualFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME &&
208-
expectedFieldSchema.getType() == Schema.Type.STRING) {
206+
expectedFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS) {
207+
// SmallDateTime case where we map it to CDAP DateTime now as opposed to CDAP Timestamp earlier, as
208+
// SmallDateTime does not contain any TimeZone information
209+
return;
210+
}
211+
if ((actualFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME ||
212+
actualFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS) &&
213+
expectedFieldSchema.getType() == Schema.Type.STRING) {
214+
// Case when user manually sets the type to string
215+
return;
216+
}
217+
if (actualFieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS &&
218+
expectedFieldSchema.getLogicalType() == Schema.LogicalType.DATETIME) {
219+
// DateTimeOffset case where we map it to CDAP Timestamp as opposed to CDAP DateTime earlier
209220
return;
210221
}
211222
super.validateField(collector, field, actualFieldSchema, expectedFieldSchema);

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSourceDBRecord.java

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
import java.sql.SQLException;
2929
import java.sql.Timestamp;
3030
import java.sql.Types;
31+
import java.time.OffsetDateTime;
32+
import java.time.ZoneId;
33+
import java.time.ZoneOffset;
34+
import java.util.Date;
35+
import java.util.GregorianCalendar;
3136
import java.util.List;
37+
import java.util.TimeZone;
3238

3339
/**
3440
* SQL Server Source implementation {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} and {@link
@@ -52,21 +58,35 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
5258
fieldSchema = fieldSchema.getNonNullable();
5359
}
5460

55-
if (SqlServerSourceSchemaReader.shouldConvertToDatetime(resultSet.getMetaData(), columnIndex) &&
56-
fieldSchema.getLogicalType() == Schema.LogicalType.DATETIME) {
57-
try {
58-
Method getLocalDateTime = resultSet.getClass().getMethod("getDateTime", int.class);
59-
Timestamp value = (Timestamp) getLocalDateTime.invoke(resultSet, columnIndex);
60-
recordBuilder.setDateTime(field.getName(), value == null ? null : value.toLocalDateTime());
61-
return;
62-
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
63-
throw new RuntimeException(String.format("Fail to convert column %s of type %s to datetime. Error: %s.",
64-
resultSet.getMetaData().getColumnName(columnIndex),
65-
resultSet.getMetaData().getColumnTypeName(columnIndex),
66-
e.getMessage()), e);
67-
}
68-
}
6961
switch (sqlType) {
62+
case Types.TIMESTAMP:
63+
// SmallDateTime, DateTime, DateTime2 usecase
64+
GregorianCalendar gc = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
65+
gc.setGregorianChange(new Date(Long.MIN_VALUE));
66+
Timestamp timestampSmalldatetime = resultSet.getTimestamp(columnIndex, gc);
67+
if (timestampSmalldatetime == null) {
68+
recordBuilder.set(field.getName(), null);
69+
} else if (fieldSchema.getLogicalType() == Schema.LogicalType.DATETIME) {
70+
// SmallDateTime, Datetime, datetime2 to CDAP Datetime type conversion
71+
setDateTime(resultSet, recordBuilder, field, columnIndex);
72+
} else {
73+
// Deprecated use case of supporting SmallDateTime to CDAP Timestamp conversion
74+
recordBuilder.setTimestamp(field.getName(), timestampSmalldatetime.toInstant()
75+
.atZone(ZoneId.ofOffset("UTC", ZoneOffset.UTC)));
76+
}
77+
break;
78+
case SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE:
79+
OffsetDateTime timestampOffset = resultSet.getObject(columnIndex, OffsetDateTime.class);
80+
if (timestampOffset == null) {
81+
recordBuilder.set(field.getName(), null);
82+
} else if (fieldSchema.getLogicalType() == Schema.LogicalType.TIMESTAMP_MICROS) {
83+
// DateTimeOffset to CDAP Timestamp type conversion
84+
recordBuilder.setTimestamp(field.getName(), timestampOffset.atZoneSameInstant(ZoneId.of("UTC")));
85+
} else {
86+
// Deprecated use case of supporting DateTimeOffset to CDAP DateTime conversion.
87+
setDateTime(resultSet, recordBuilder, field, columnIndex);
88+
}
89+
break;
7090
case Types.TIME:
7191
// Handle reading SQL Server 'TIME' data type to avoid accuracy loss.
7292
// 'TIME' data type has the accuracy of 100 nanoseconds(1 millisecond in Informatica)
@@ -75,14 +95,25 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
7595
recordBuilder.setTime(field.getName(),
7696
timestamp == null ? null : timestamp.toLocalDateTime().toLocalTime());
7797
break;
78-
case SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE:
79-
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
80-
break;
8198
default:
8299
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
83100
}
84101
}
85102

103+
public void setDateTime(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
104+
int columnIndex) throws SQLException {
105+
try {
106+
Method getLocalDateTime = resultSet.getClass().getMethod("getDateTime", int.class);
107+
Timestamp value = (Timestamp) getLocalDateTime.invoke(resultSet, columnIndex);
108+
recordBuilder.setDateTime(field.getName(), value == null ? null : value.toLocalDateTime());
109+
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
110+
throw new RuntimeException(String.format("Fail to convert column %s of type %s to datetime. Error: %s.",
111+
resultSet.getMetaData().getColumnName(columnIndex),
112+
resultSet.getMetaData().getColumnTypeName(columnIndex),
113+
e.getMessage()), e);
114+
}
115+
}
116+
86117
@Override
87118
protected SchemaReader getSchemaReader() {
88119
return new SqlServerSourceSchemaReader();

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerSourceSchemaReader.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.sql.ResultSetMetaData;
2323
import java.sql.SQLException;
24+
import java.sql.Types;
2425

2526
/**
2627
* SQL Server Source schema reader.
@@ -49,41 +50,19 @@ public SqlServerSourceSchemaReader(String sessionID) {
4950
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5051
int columnSqlType = metadata.getColumnType(index);
5152

52-
if (shouldConvertToDatetime(metadata, index)) {
53+
if (DATETIME_OFFSET_TYPE == columnSqlType) {
54+
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
55+
}
56+
if (columnSqlType == Types.TIMESTAMP) {
57+
// SmallDateTime, DateTime and DateTime2 will be mapped to Types.DATETIME
5358
return Schema.of(Schema.LogicalType.DATETIME);
54-
5559
}
5660
if (GEOMETRY_TYPE == columnSqlType || GEOGRAPHY_TYPE == columnSqlType) {
5761
return Schema.of(Schema.Type.BYTES);
5862
}
5963
return super.getSchema(metadata, index);
6064
}
6165

62-
/**
63-
* Whether the corresponding column should be converted to CDAP Datetime Logical Type
64-
* @param metadata result set metadata
65-
* @param index index of the column
66-
* @return whether the corresponding column should be converted to CDAP Datetime Logical Type
67-
* @throws SQLException
68-
*/
69-
public static boolean shouldConvertToDatetime(ResultSetMetaData metadata, int index) throws SQLException {
70-
// datetimeoffset will have type DATETIME_OFFSET_TYPE
71-
// datetime and datetime2 will have type Types.TIMESTAMP
72-
// cannot decide based on sql type
73-
String columnTypeName = metadata.getColumnTypeName(index);
74-
return shouldConvertToDatetime(columnTypeName);
75-
}
76-
77-
/**
78-
* Whether the corresponding data type should be converted to CDAP Datetime Logical Type
79-
* SQL Server data type datetime, datetime2 and datetimeoffset will be converted to CDAP Datetime Logical Type
80-
* @param typeName the data type name
81-
* @return Whether the corresponding data type should be converted to CDAP Datetime Logical Type
82-
*/
83-
public static boolean shouldConvertToDatetime(String typeName) {
84-
return typeName.startsWith(DATETIME_TYPE_PREFIX);
85-
}
86-
8766
@Override
8867
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
8968
if (sessionID == null) {

0 commit comments

Comments
 (0)