Skip to content

Commit 99511b4

Browse files
authored
Merge pull request #347 from data-integrations/mysql_sanity
Added datatypes sanity test for MySQL Plugin
2 parents a5f9fea + e9eebd1 commit 99511b4

File tree

16 files changed

+321
-74
lines changed

16 files changed

+321
-74
lines changed

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,8 @@ protected void writeToDB(PreparedStatement stmt, @Nullable Schema.Field field, i
331331
writeInt(stmt, fieldIndex, sqlIndex, fieldValue);
332332
break;
333333
case LONG:
334-
stmt.setLong(sqlIndex, (Long) fieldValue);
334+
long fieldValueLong = ((Number) fieldValue).longValue();
335+
stmt.setLong(sqlIndex, fieldValueLong);
335336
break;
336337
case FLOAT:
337338
// both real and float are set with the same method on prepared statement

database-commons/src/main/java/io/cdap/plugin/db/sink/CommonFieldsValidator.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
7373
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
7474

7575
int sqlType = metadata.getColumnType(index);
76+
boolean isSigned = metadata.isSigned(index);
77+
int precision = metadata.getPrecision(index);
7678

77-
return isFieldCompatible(fieldType, fieldLogicalType, sqlType);
79+
return isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
7880
}
7981

8082

@@ -84,9 +86,14 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
8486
* @param fieldType field type.
8587
* @param fieldLogicalType filed logical type.
8688
* @param sqlType code of sql type.
89+
* @param precision
8790
* @return 'true' if field is compatible to be written, 'false' otherwise.
8891
*/
89-
public boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType fieldLogicalType, int sqlType) {
92+
public boolean isFieldCompatible(Schema.Type fieldType,
93+
Schema.LogicalType fieldLogicalType,
94+
int sqlType,
95+
int precision,
96+
boolean isSigned) {
9097
// Handle logical types first
9198
if (fieldLogicalType != null) {
9299
switch (fieldLogicalType) {
@@ -100,7 +107,8 @@ public boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType field
100107
return sqlType == Types.TIMESTAMP;
101108
case DECIMAL:
102109
return sqlType == Types.NUMERIC
103-
|| sqlType == Types.DECIMAL;
110+
|| sqlType == Types.DECIMAL
111+
|| (sqlType == Types.BIGINT && !isSigned && precision >= 19);
104112
}
105113
}
106114

@@ -115,7 +123,8 @@ public boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType field
115123
|| sqlType == Types.SMALLINT
116124
|| sqlType == Types.TINYINT;
117125
case LONG:
118-
return sqlType == Types.BIGINT;
126+
return sqlType == Types.BIGINT
127+
|| (!isSigned && sqlType == Types.INTEGER);
119128
case FLOAT:
120129
return sqlType == Types.REAL
121130
|| sqlType == Types.FLOAT;

database-commons/src/main/java/io/cdap/plugin/db/sink/FieldsValidator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public interface FieldsValidator {
5252
* @param fieldType field type.
5353
* @param fieldLogicalType filed logical type.
5454
* @param sqlType code of sql type.
55+
* @param precision
5556
* @return 'true' if field is compatible to be written, 'false' otherwise.
5657
*/
57-
boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType fieldLogicalType, int sqlType);
58+
boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType fieldLogicalType, int sqlType, int precision,
59+
boolean isSigned);
5860
}

database-commons/src/test/java/io/cdap/plugin/db/sink/CommonFieldsValidatorTest.java

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,34 +36,62 @@ public class CommonFieldsValidatorTest {
3636

3737
@Test
3838
public void testIsFieldCompatible() {
39-
validateFieldCompatible(Schema.Type.INT, Schema.LogicalType.DATE, Types.DATE, true);
40-
validateFieldCompatible(Schema.Type.INT, Schema.LogicalType.TIME_MILLIS, Types.TIME, true);
41-
validateFieldCompatible(Schema.Type.LONG, Schema.LogicalType.TIME_MICROS, Types.TIME, true);
42-
validateFieldCompatible(Schema.Type.LONG, Schema.LogicalType.TIMESTAMP_MILLIS, Types.TIMESTAMP, true);
43-
validateFieldCompatible(Schema.Type.LONG, Schema.LogicalType.TIMESTAMP_MICROS, Types.TIMESTAMP, true);
44-
validateFieldCompatible(Schema.Type.BYTES, Schema.LogicalType.DECIMAL, Types.NUMERIC, true);
45-
validateFieldCompatible(Schema.Type.BYTES, Schema.LogicalType.DECIMAL, Types.DECIMAL, true);
46-
validateFieldCompatible(Schema.Type.NULL, null, 0, true);
47-
validateFieldCompatible(Schema.Type.BOOLEAN, null, Types.BOOLEAN, true);
48-
validateFieldCompatible(Schema.Type.BOOLEAN, null, Types.BIT, true);
49-
validateFieldCompatible(Schema.Type.INT, null, Types.INTEGER, true);
50-
validateFieldCompatible(Schema.Type.INT, null, Types.SMALLINT, true);
51-
validateFieldCompatible(Schema.Type.INT, null, Types.TINYINT, true);
52-
validateFieldCompatible(Schema.Type.FLOAT, null, Types.REAL, true);
53-
validateFieldCompatible(Schema.Type.FLOAT, null, Types.FLOAT, true);
54-
validateFieldCompatible(Schema.Type.DOUBLE, null, Types.DOUBLE, true);
55-
validateFieldCompatible(Schema.Type.BYTES, null, Types.BINARY, true);
56-
validateFieldCompatible(Schema.Type.BYTES, null, Types.VARBINARY, true);
57-
validateFieldCompatible(Schema.Type.BYTES, null, Types.LONGVARBINARY, true);
58-
validateFieldCompatible(Schema.Type.BYTES, null, Types.BLOB, true);
59-
validateFieldCompatible(Schema.Type.STRING, null, Types.VARCHAR, true);
60-
validateFieldCompatible(Schema.Type.STRING, null, Types.CHAR, true);
61-
validateFieldCompatible(Schema.Type.STRING, null, Types.CLOB, true);
62-
validateFieldCompatible(Schema.Type.STRING, null, Types.LONGNVARCHAR, true);
63-
validateFieldCompatible(Schema.Type.STRING, null, Types.LONGVARCHAR, true);
64-
validateFieldCompatible(Schema.Type.STRING, null, Types.NCHAR, true);
65-
validateFieldCompatible(Schema.Type.STRING, null, Types.NCLOB, true);
66-
validateFieldCompatible(Schema.Type.LONG, null, Types.TIMESTAMP, false);
39+
validateFieldCompatible(Schema.Type.INT, Schema.LogicalType.DATE, Types.DATE, true, 0,
40+
true);
41+
validateFieldCompatible(Schema.Type.INT, Schema.LogicalType.TIME_MILLIS, Types.TIME, true,
42+
0, true);
43+
validateFieldCompatible(Schema.Type.LONG, Schema.LogicalType.TIME_MICROS, Types.TIME, true,
44+
0, true);
45+
validateFieldCompatible(Schema.Type.LONG, Schema.LogicalType.TIMESTAMP_MILLIS, Types.TIMESTAMP, true,
46+
0, true);
47+
validateFieldCompatible(Schema.Type.LONG, Schema.LogicalType.TIMESTAMP_MICROS, Types.TIMESTAMP, true,
48+
0, true);
49+
validateFieldCompatible(Schema.Type.BYTES, Schema.LogicalType.DECIMAL, Types.NUMERIC, true,
50+
0, true);
51+
validateFieldCompatible(Schema.Type.BYTES, Schema.LogicalType.DECIMAL, Types.DECIMAL, true,
52+
0, true);
53+
validateFieldCompatible(Schema.Type.NULL, null, 0, true, 0,
54+
true);
55+
validateFieldCompatible(Schema.Type.BOOLEAN, null, Types.BOOLEAN, true, 0,
56+
true);
57+
validateFieldCompatible(Schema.Type.BOOLEAN, null, Types.BIT, true, 0,
58+
true);
59+
validateFieldCompatible(Schema.Type.INT, null, Types.INTEGER, true, 0,
60+
true);
61+
validateFieldCompatible(Schema.Type.INT, null, Types.SMALLINT, true, 0,
62+
true);
63+
validateFieldCompatible(Schema.Type.INT, null, Types.TINYINT, true, 0,
64+
true);
65+
validateFieldCompatible(Schema.Type.FLOAT, null, Types.REAL, true, 0,
66+
true);
67+
validateFieldCompatible(Schema.Type.FLOAT, null, Types.FLOAT, true, 0,
68+
true);
69+
validateFieldCompatible(Schema.Type.DOUBLE, null, Types.DOUBLE, true, 0,
70+
true);
71+
validateFieldCompatible(Schema.Type.BYTES, null, Types.BINARY, true, 0,
72+
true);
73+
validateFieldCompatible(Schema.Type.BYTES, null, Types.VARBINARY, true, 0,
74+
true);
75+
validateFieldCompatible(Schema.Type.BYTES, null, Types.LONGVARBINARY, true,
76+
0, true);
77+
validateFieldCompatible(Schema.Type.BYTES, null, Types.BLOB, true, 0,
78+
true);
79+
validateFieldCompatible(Schema.Type.STRING, null, Types.VARCHAR, true, 0,
80+
true);
81+
validateFieldCompatible(Schema.Type.STRING, null, Types.CHAR, true, 0,
82+
true);
83+
validateFieldCompatible(Schema.Type.STRING, null, Types.CLOB, true, 0,
84+
true);
85+
validateFieldCompatible(Schema.Type.STRING, null, Types.LONGNVARCHAR, true,
86+
0, true);
87+
validateFieldCompatible(Schema.Type.STRING, null, Types.LONGVARCHAR, true, 0,
88+
true);
89+
validateFieldCompatible(Schema.Type.STRING, null, Types.NCHAR, true, 0,
90+
true);
91+
validateFieldCompatible(Schema.Type.STRING, null, Types.NCLOB, true, 0,
92+
true);
93+
validateFieldCompatible(Schema.Type.LONG, null, Types.TIMESTAMP, false, 0,
94+
true);
6795
}
6896

6997
@Test
@@ -186,11 +214,12 @@ public void testValidateFieldsWithNullable() throws Exception {
186214
}
187215

188216
public void validateFieldCompatible(Schema.Type fieldType, Schema.LogicalType fieldLogicalType, int sqlType,
189-
boolean isCompatible) {
217+
boolean isCompatible, int precision, boolean isSigned) {
190218
String errorMessage = String.format("Expected type '%s' is %s with sql type '%d'",
191219
fieldType,
192220
isCompatible ? "compatible" : "not compatible",
193221
sqlType);
194-
Assert.assertEquals(errorMessage, isCompatible, VALIDATOR.isFieldCompatible(fieldType, fieldLogicalType, sqlType));
222+
Assert.assertEquals(errorMessage, isCompatible, VALIDATOR.isFieldCompatible(fieldType, fieldLogicalType, sqlType,
223+
precision, isSigned));
195224
}
196225
}

db2-plugin/src/main/java/io/cdap/plugin/db2/DB2FieldsValidator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
3636

3737
int sqlType = metadata.getColumnType(index);
3838
String colTypeName = metadata.getColumnTypeName(index);
39+
boolean isSigned = metadata.isSigned(index);
40+
int precision = metadata.getPrecision(index);
3941

4042
// Handle logical types first
4143
if (fieldLogicalType != null) {
42-
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType);
44+
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
4345
}
4446

4547
switch (fieldType) {
@@ -49,7 +51,7 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4951
|| DB2SchemaReader.DB2_DECFLOAT.equals(colTypeName)
5052
|| super.isFieldCompatible(field, metadata, index);
5153
default:
52-
return super.isFieldCompatible(fieldType, null, sqlType);
54+
return super.isFieldCompatible(fieldType, null, sqlType, precision, isSigned);
5355
}
5456
}
5557
}

memsql-plugin/src/main/java/io/cdap/plugin/memsql/sink/MemsqlFieldsValidator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@
2727
public class MemsqlFieldsValidator extends CommonFieldsValidator {
2828

2929
@Override
30-
public boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType fieldLogicalType, int sqlType) {
30+
public boolean isFieldCompatible(Schema.Type fieldType, Schema.LogicalType fieldLogicalType, int sqlType,
31+
int precision, boolean isSigned) {
3132
// In MemqSQL bool stores as tinyint
3233
if (fieldType == Schema.Type.BOOLEAN && sqlType == Types.TINYINT) {
3334
return true;
3435
}
3536

36-
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType);
37+
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
3738
}
3839
}

mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
3434
Schema.LogicalType fieldLogicalType = fieldSchema.getLogicalType();
3535

3636
int sqlType = metadata.getColumnType(index);
37+
boolean isSigned = metadata.isSigned(index);
38+
int precision = metadata.getPrecision(index);
3739

3840
// Handle logical types first
3941
if (fieldLogicalType != null) {
40-
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType);
42+
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
4143
}
4244

4345
switch (fieldType) {
@@ -53,7 +55,7 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
5355
|| sqlType == SqlServerSinkSchemaReader.SQL_VARIANT
5456
|| super.isFieldCompatible(field, metadata, index);
5557
default:
56-
return super.isFieldCompatible(fieldType, null, sqlType);
58+
return super.isFieldCompatible(fieldType, null, sqlType, precision, isSigned);
5759
}
5860
}
5961
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Copyright © 2022 Cask Data, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
# use this file except in compliance with the License. You may obtain a copy of
6+
# the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations under
14+
# the License.
15+
#
16+
17+
@Mysql
18+
Feature: Mysql - Verify Mysql source data transfer for different datatypes
19+
@MYSQL_SOURCE_DATATYPES_TEST @MYSQL_SINK_TEST @Mysql_Required
20+
Scenario: To verify data is getting transferred from Mysql to Mysql successfully
21+
Given Open Datafusion Project to configure pipeline
22+
When Expand Plugin group in the LHS plugins list: "Source"
23+
When Select plugin: "MySQL" from the plugins list as: "Source"
24+
When Expand Plugin group in the LHS plugins list: "Sink"
25+
When Select plugin: "MySQL" from the plugins list as: "Sink"
26+
Then Connect plugins: "MySQL" and "MySQL2" to establish connection
27+
Then Navigate to the properties page of plugin: "MySQL"
28+
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
29+
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
30+
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
31+
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
32+
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
33+
Then Enter input plugin property: "referenceName" with value: "sourceRef"
34+
Then Replace input plugin property: "database" with value: "databaseName"
35+
Then Enter textarea plugin property: "importQuery" with value: "selectQuery"
36+
Then Click on the Get Schema button
37+
Then Verify the Output Schema matches the Expected Schema: "datatypesSchema"
38+
Then Validate "MySQL" plugin properties
39+
Then Close the Plugin Properties page
40+
Then Navigate to the properties page of plugin: "MySQL2"
41+
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
42+
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
43+
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
44+
Then Replace input plugin property: "database" with value: "databaseName"
45+
Then Replace input plugin property: "tableName" with value: "targetTable"
46+
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
47+
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
48+
Then Enter input plugin property: "referenceName" with value: "targetRef"
49+
Then Validate "MySQL2" plugin properties
50+
Then Close the Plugin Properties page
51+
Then Save the pipeline
52+
Then Preview and run the pipeline
53+
Then Verify the preview of pipeline is "success"
54+
Then Click on preview data for MySQL sink
55+
Then Close the preview data
56+
Then Deploy the pipeline
57+
Then Run the Pipeline in Runtime
58+
Then Wait till pipeline is in running state
59+
Then Open and capture logs
60+
Then Verify the pipeline status is "Succeeded"
61+
Then Validate the values of records transferred to target table is equal to the values from source table

mysql-plugin/src/e2e-test/features/mysql/MySql.feature

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,4 @@ Feature: Mysql - Verify Mysql source data transfer
5959
Then Wait till pipeline is in running state
6060
Then Open and capture logs
6161
Then Verify the pipeline status is "Succeeded"
62-
Then Get count of no of records transferred to target MySQL Table
63-
Then Validate records transferred to target table is equal to number of records from source table
62+
Then Validate the values of records transferred to target table is equal to the values from source table

0 commit comments

Comments
 (0)