Skip to content

Commit a938283

Browse files
Enabling support for the precision less numbers in PostgreSQL plugin. (#404)
1 parent 3379616 commit a938283

File tree

6 files changed

+131
-19
lines changed

6 files changed

+131
-19
lines changed

postgresql-plugin/pom.xml

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,6 @@
5353
<version>42.2.20</version>
5454
<scope>test</scope>
5555
</dependency>
56-
<dependency>
57-
<groupId>org.mockito</groupId>
58-
<artifactId>mockito-core</artifactId>
59-
</dependency>
6056
<dependency>
6157
<groupId>io.cdap.plugin</groupId>
6258
<artifactId>database-commons</artifactId>
@@ -72,10 +68,6 @@
7268
<groupId>io.cdap.cdap</groupId>
7369
<artifactId>cdap-data-pipeline3_2.12</artifactId>
7470
</dependency>
75-
<dependency>
76-
<groupId>junit</groupId>
77-
<artifactId>junit</artifactId>
78-
</dependency>
7971
<dependency>
8072
<groupId>io.cdap.cdap</groupId>
8173
<artifactId>cdap-api</artifactId>
@@ -87,6 +79,16 @@
8779
<version>RELEASE</version>
8880
<scope>compile</scope>
8981
</dependency>
82+
<dependency>
83+
<groupId>org.mockito</groupId>
84+
<artifactId>mockito-core</artifactId>
85+
<scope>test</scope>
86+
</dependency>
87+
<dependency>
88+
<groupId>junit</groupId>
89+
<artifactId>junit</artifactId>
90+
<scope>test</scope>
91+
</dependency>
9092
</dependencies>
9193
<build>
9294
<plugins>

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Method;
29+
import java.math.BigDecimal;
30+
import java.math.RoundingMode;
2931
import java.sql.PreparedStatement;
3032
import java.sql.ResultSet;
3133
import java.sql.ResultSetMetaData;
@@ -57,8 +59,9 @@ public PostgresDBRecord() {
5759
@Override
5860
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
5961
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
60-
String columnTypeName = resultSet.getMetaData().getColumnTypeName(columnIndex);
61-
if (isUseSchema(resultSet.getMetaData(), columnIndex)) {
62+
ResultSetMetaData metadata = resultSet.getMetaData();
63+
String columnTypeName = metadata.getColumnTypeName(columnIndex);
64+
if (isUseSchema(metadata, columnIndex)) {
6265
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
6366
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
6467
Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR);
@@ -80,6 +83,24 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
8083
recordBuilder.set(field.getName(), null);
8184
}
8285
} else {
86+
int columnType = metadata.getColumnType(columnIndex);
87+
if (columnType == Types.NUMERIC) {
88+
Schema nonNullableSchema = field.getSchema().isNullable() ?
89+
field.getSchema().getNonNullable() : field.getSchema();
90+
int precision = metadata.getPrecision(columnIndex);
91+
if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
92+
// When output schema is set to String for precision less numbers
93+
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
94+
} else if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) {
95+
BigDecimal orgValue = resultSet.getBigDecimal(columnIndex);
96+
if (orgValue != null) {
97+
BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString())
98+
.setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
99+
recordBuilder.setDecimal(field.getName(), decimalValue);
100+
}
101+
}
102+
return;
103+
}
83104
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
84105
}
85106
}
@@ -98,14 +119,14 @@ private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordB
98119
}
99120

100121
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
101-
switch (metadata.getColumnTypeName(columnIndex)) {
102-
case "bit":
103-
case "timetz":
104-
case "money":
105-
return true;
106-
default:
107-
return PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex));
122+
String columnTypeName = metadata.getColumnTypeName(columnIndex);
123+
// If the column Type Name is present in the String mapped PostgreSQL types then return true.
124+
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName)
125+
|| PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex))) {
126+
return true;
108127
}
128+
129+
return false;
109130
}
110131

111132
private Object createPGobject(String type, String value, ClassLoader classLoader) throws SQLException {
@@ -133,9 +154,17 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
133154
stmt.setObject(sqlIndex, createPGobject(columnType.getTypeName(),
134155
record.get(fieldName),
135156
stmt.getClass().getClassLoader()));
136-
} else {
137-
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
157+
return;
158+
} else if (columnType.getType() == Types.NUMERIC) {
159+
if (record.get(fieldName) != null) {
160+
if (fieldSchema.getType() == Schema.Type.STRING) {
161+
stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName)));
162+
return;
163+
}
164+
}
138165
}
166+
167+
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
139168
}
140169

141170
@Override

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresFieldsValidator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.sql.ResultSetMetaData;
2323
import java.sql.SQLException;
24+
import java.sql.Types;
2425
import java.util.Objects;
2526

2627
/**
@@ -46,6 +47,14 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
4647
}
4748
}
4849

50+
// Since Numeric types without precision and scale are getting converted into CDAP String type at the Source
51+
// plugin, hence making the String type compatible with the Numeric type at the Sink as well.
52+
if (fieldType.equals(Schema.Type.STRING)) {
53+
if (Types.NUMERIC == columnType) {
54+
return true;
55+
}
56+
}
57+
4958
if (colTypeName.equalsIgnoreCase("timestamp")
5059
&& schema.getLogicalType().equals(Schema.LogicalType.DATETIME)) {
5160
return true;

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.common.collect.ImmutableSet;
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.db.CommonSchemaReader;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2224

2325
import java.sql.ResultSetMetaData;
2426
import java.sql.SQLException;
@@ -30,6 +32,8 @@
3032
*/
3133
public class PostgresSchemaReader extends CommonSchemaReader {
3234

35+
private static final Logger LOG = LoggerFactory.getLogger(PostgresSchemaReader.class);
36+
3337
public static final Set<Integer> STRING_MAPPED_POSTGRES_TYPES = ImmutableSet.of(
3438
Types.OTHER, Types.ARRAY, Types.SQLXML
3539
);
@@ -58,6 +62,18 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
5862
return Schema.of(Schema.Type.STRING);
5963
}
6064

65+
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
66+
if (Types.NUMERIC == columnType) {
67+
int precision = metadata.getPrecision(index);
68+
if (precision == 0) {
69+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
70+
+ "converting into STRING type to avoid any precision loss.",
71+
metadata.getColumnName(index),
72+
metadata.getColumnTypeName(index)));
73+
return Schema.of(Schema.Type.STRING);
74+
}
75+
}
76+
6177
if (typeName.equalsIgnoreCase("timestamp")) {
6278
return Schema.of(Schema.LogicalType.DATETIME);
6379
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ public void validate(FailureCollector collector) {
142142
@Override
143143
protected void validateField(FailureCollector collector, Schema.Field field,
144144
Schema actualFieldSchema, Schema expectedFieldSchema) {
145+
146+
// This change is needed to make sure that the pipeline upgrade continues to work post upgrade.
147+
// Since the older handling of the precision less used to convert to the decimal type,
148+
// and the new version would try to convert to the String type. In that case the output schema would
149+
// contain Decimal(38, 0) (or something similar), and the code internally would try to identify
150+
// the schema of the field(without precision and scale) as String.
151+
if (Schema.LogicalType.DECIMAL.equals(expectedFieldSchema.getLogicalType())
152+
&& actualFieldSchema.getType().equals(Schema.Type.STRING)) {
153+
return;
154+
}
155+
145156
// This change is needed to make sure that the pipeline upgrade continues to work post upgrade.
146157
// Since the older handling of the Timestamp used to convert to CDAP TIMESTAMP type,
147158
// but since PostgreSQL Timestamp does not have a timezone information, hence it should ideally map to

postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresDBRecordUnitTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.mockito.Mockito;
2626
import org.mockito.junit.MockitoJUnitRunner;
2727

28+
import java.math.BigDecimal;
2829
import java.sql.ResultSet;
2930
import java.sql.ResultSetMetaData;
3031
import java.sql.SQLException;
@@ -42,6 +43,50 @@
4243
*/
4344
@RunWith(MockitoJUnitRunner.class)
4445
public class PostgresDBRecordUnitTest {
46+
47+
private static final int DEFAULT_PRECISION = 38;
48+
49+
/**
50+
* Validate the precision less Numbers handling against following use cases.
51+
* 1. Ensure that the numeric type with [p,s] set as [38,4] detect as BigDecimal(38,4) in cdap.
52+
* 2. Ensure that the numeric type without [p,s] detect as String type in cdap.
53+
* @throws Exception
54+
*/
55+
@Test
56+
public void validatePrecisionLessDecimalParsing() throws Exception {
57+
Schema.Field field1 = Schema.Field.of("ID1", Schema.decimalOf(DEFAULT_PRECISION, 4));
58+
Schema.Field field2 = Schema.Field.of("ID2", Schema.of(Schema.Type.STRING));
59+
60+
Schema schema = Schema.recordOf(
61+
"dbRecord",
62+
field1,
63+
field2
64+
);
65+
66+
ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
67+
when(resultSetMetaData.getColumnType(eq(1))).thenReturn(Types.NUMERIC);
68+
when(resultSetMetaData.getPrecision(eq(1))).thenReturn(DEFAULT_PRECISION);
69+
when(resultSetMetaData.getColumnType(eq(2))).thenReturn(Types.NUMERIC);
70+
when(resultSetMetaData.getPrecision(eq(2))).thenReturn(0);
71+
72+
ResultSet resultSet = Mockito.mock(ResultSet.class);
73+
74+
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
75+
when(resultSet.getBigDecimal(eq(1))).thenReturn(BigDecimal.valueOf(123.4568));
76+
when(resultSet.getString(eq(2))).thenReturn("123.4568");
77+
78+
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
79+
PostgresDBRecord dbRecord = new PostgresDBRecord(null, null, null, null);
80+
dbRecord.handleField(resultSet, builder, field1, 1, Types.NUMERIC, DEFAULT_PRECISION, 4);
81+
dbRecord.handleField(resultSet, builder, field2, 2, Types.NUMERIC, 0, -127);
82+
83+
StructuredRecord record = builder.build();
84+
Assert.assertTrue(record.getDecimal("ID1") instanceof BigDecimal);
85+
Assert.assertEquals(record.getDecimal("ID1"), BigDecimal.valueOf(123.4568));
86+
Assert.assertTrue(record.get("ID2") instanceof String);
87+
Assert.assertEquals(record.get("ID2"), "123.4568");
88+
}
89+
4590
@Test
4691
public void validateTimestampType() throws SQLException {
4792
OffsetDateTime offsetDateTime = OffsetDateTime.of(2023, 1, 1, 1, 0, 0, 0, ZoneOffset.UTC);

0 commit comments

Comments
 (0)