Skip to content

Commit 9c006c6

Browse files
Merge pull request #444 from data-integrations/fix/Cloudsql_PostgresSQL
Changes in Postgres_Precision_Fix - Sync up with Releases
2 parents 67bf43b + 09a5ace commit 9c006c6

File tree

5 files changed

+40
-38
lines changed

5 files changed

+40
-38
lines changed

cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
150150
| double precision | double | |
151151
| integer | int | |
152152
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
153+
| numeric(with 0 precision) | string | |
153154
| real | float | |
154155
| smallint | int | |
155156
| text | string | |

cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
174174
| double precision | double | |
175175
| integer | int | |
176176
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
177+
| numeric(with 0 precision) | string | |
177178
| real | float | |
178179
| smallint | int | |
179180
| smallserial | int | |

postgresql-plugin/docs/Postgres-batchsink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
7979
| double precision | double | |
8080
| integer | int | |
8181
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
82+
| numeric(with 0 precision) | string | |
8283
| real | float | |
8384
| smallint | int | |
8485
| text | string | |

postgresql-plugin/docs/Postgres-batchsource.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
110110
| double precision | double | |
111111
| integer | int | |
112112
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
113+
| numeric(with 0 precision) | string | |
113114
| real | float | |
114115
| smallint | int | |
115116
| smallserial | int | |

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -63,46 +63,50 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
6363
String columnTypeName = metadata.getColumnTypeName(columnIndex);
6464
if (isUseSchema(metadata, columnIndex)) {
6565
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
66-
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
66+
return;
67+
}
68+
if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
6769
Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR);
6870
if (timestamp != null) {
6971
ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset())
70-
.atZoneSameInstant(ZoneId.of("UTC"));
72+
.atZoneSameInstant(ZoneId.of("UTC"));
7173
Schema nonNullableSchema = field.getSchema().isNullable() ?
72-
field.getSchema().getNonNullable() : field.getSchema();
74+
field.getSchema().getNonNullable() : field.getSchema();
7375
setZonedDateTimeBasedOnOuputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
74-
field.getName(), zonedDateTime);
76+
field.getName(), zonedDateTime);
7577
} else {
7678
recordBuilder.set(field.getName(), null);
7779
}
78-
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
80+
return;
81+
}
82+
if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
7983
OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class);
8084
if (timestamp != null) {
8185
recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC")));
8286
} else {
8387
recordBuilder.set(field.getName(), null);
8488
}
85-
} else {
86-
int columnType = metadata.getColumnType(columnIndex);
87-
if (columnType == Types.NUMERIC) {
88-
Schema nonNullableSchema = field.getSchema().isNullable() ?
89-
field.getSchema().getNonNullable() : field.getSchema();
90-
int precision = metadata.getPrecision(columnIndex);
91-
if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
92-
// When output schema is set to String for precision less numbers
93-
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
94-
} else if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) {
95-
BigDecimal orgValue = resultSet.getBigDecimal(columnIndex);
96-
if (orgValue != null) {
97-
BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString())
98-
.setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
99-
recordBuilder.setDecimal(field.getName(), decimalValue);
100-
}
101-
}
89+
return;
90+
}
91+
int columnType = metadata.getColumnType(columnIndex);
92+
if (columnType == Types.NUMERIC) {
93+
Schema nonNullableSchema = field.getSchema().isNullable() ?
94+
field.getSchema().getNonNullable() : field.getSchema();
95+
int precision = metadata.getPrecision(columnIndex);
96+
if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
97+
// When output schema is set to String for precision less numbers
98+
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
99+
return;
100+
}
101+
BigDecimal orgValue = resultSet.getBigDecimal(columnIndex);
102+
if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType()) && orgValue != null) {
103+
BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString())
104+
.setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
105+
recordBuilder.setDecimal(field.getName(), decimalValue);
102106
return;
103107
}
104-
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
105108
}
109+
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
106110
}
107111

108112
private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordBuilder,
@@ -121,12 +125,8 @@ private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordB
121125
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
122126
String columnTypeName = metadata.getColumnTypeName(columnIndex);
123127
// If the column Type Name is present in the String mapped PostgreSQL types then return true.
124-
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName)
125-
|| PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex))) {
126-
return true;
127-
}
128-
129-
return false;
128+
return (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName)
129+
|| PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex)));
130130
}
131131

132132
private Object createPGobject(String type, String value, ClassLoader classLoader) throws SQLException {
@@ -152,16 +152,14 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
152152
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnType.getTypeName()) ||
153153
PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(columnType.getType())) {
154154
stmt.setObject(sqlIndex, createPGobject(columnType.getTypeName(),
155-
record.get(fieldName),
156-
stmt.getClass().getClassLoader()));
155+
record.get(fieldName),
156+
stmt.getClass().getClassLoader()));
157+
return;
158+
}
159+
if (columnType.getType() == Types.NUMERIC && record.get(fieldName) != null &&
160+
fieldSchema.getType() == Schema.Type.STRING) {
161+
stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName)));
157162
return;
158-
} else if (columnType.getType() == Types.NUMERIC) {
159-
if (record.get(fieldName) != null) {
160-
if (fieldSchema.getType() == Schema.Type.STRING) {
161-
stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName)));
162-
return;
163-
}
164-
}
165163
}
166164

167165
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);

0 commit comments

Comments
 (0)