Skip to content

Commit 2f5803e

Browse files
Fixing the ResultSet look up logic to look against the column name instead of field index from Schema. (#356)
Removed the buggy logic of index in Oracle plugin
1 parent ed5ab44 commit 2f5803e

File tree

4 files changed

+163
-28
lines changed

4 files changed

+163
-28
lines changed

database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public void readFields(ResultSet resultSet) throws SQLException {
104104
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
105105
for (int i = 0; i < schema.getFields().size(); i++) {
106106
Schema.Field field = schema.getFields().get(i);
107-
int columnIndex = i + 1;
107+
// Find the field index in the resultSet having the same name
108+
int columnIndex = resultSet.findColumn(field.getName());
108109
int sqlType = metadata.getColumnType(columnIndex);
109110
int sqlPrecision = metadata.getPrecision(columnIndex);
110111
int sqlScale = metadata.getScale(columnIndex);

database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@
6464
import java.sql.Statement;
6565
import java.util.ArrayList;
6666
import java.util.Collections;
67+
import java.util.HashMap;
6768
import java.util.List;
69+
import java.util.Map;
6870
import java.util.Objects;
6971
import java.util.Optional;
7072
import java.util.Properties;
@@ -276,8 +278,7 @@ private void setResultSetMetadata() throws Exception {
276278
ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0",
277279
dbColumns, fullyQualifiedTableName))
278280
) {
279-
ResultSetMetaData resultSetMetadata = rs.getMetaData();
280-
columnTypes.addAll(getMatchedColumnTypeList(resultSetMetadata, columns));
281+
columnTypes.addAll(getMatchedColumnTypeList(rs, columns));
281282
}
282283
}
283284

@@ -287,22 +288,36 @@ private void setResultSetMetadata() throws Exception {
287288
/**
288289
* Compare columns from schema with columns in table and returns list of matched columns in {@link ColumnType} format.
289290
*
290-
* @param resultSetMetadata result set metadata from table.
291+
* @param resultSet result set from table.
291292
* @param columns list of columns from schema.
292293
* @return list of matched columns.
293294
*/
294-
static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMetadata, List<String> columns)
295+
static List<ColumnType> getMatchedColumnTypeList(ResultSet resultSet, List<String> columns)
295296
throws SQLException {
296297
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
297-
// JDBC driver column indices start with 1
298-
for (int i = 0; i < resultSetMetadata.getColumnCount(); i++) {
299-
String name = resultSetMetadata.getColumnName(i + 1);
300-
String columnTypeName = resultSetMetadata.getColumnTypeName(i + 1);
301-
int type = resultSetMetadata.getColumnType(i + 1);
298+
ResultSetMetaData resultSetMetadata = resultSet.getMetaData();
299+
Map<String, String> resultSetColumnNames = new HashMap<>(resultSetMetadata.getColumnCount());
300+
301+
// Populate the ResultSet field names in lower case vs original names
302+
// JDBC driver column indices start with index 1
303+
for (int i = 1; i <= resultSetMetadata.getColumnCount(); i++) {
304+
resultSetColumnNames.put(resultSetMetadata.getColumnName(i).toLowerCase(), resultSetMetadata.getColumnName(i));
305+
}
306+
307+
// Iterate of all the columns present in the output schema and
308+
// check if the resultSet contains a column with the same name.
309+
for (int i = 0; i < columns.size(); i++) {
302310
String schemaColumnName = columns.get(i);
303-
Preconditions.checkArgument(schemaColumnName.toLowerCase().equals(name.toLowerCase()),
304-
"Missing column '%s' in SQL table", schemaColumnName);
305-
columnTypes.add(new ColumnType(schemaColumnName, columnTypeName, type));
311+
String schemaColName = schemaColumnName.toLowerCase();
312+
Preconditions.checkArgument(resultSetColumnNames.keySet().contains(schemaColName),
313+
"Missing column '%s' in SQL table", schemaColumnName);
314+
315+
// Find the column in the resultSet, as the index in the schema might not match with the resultSet.
316+
int columnIndex = resultSet.findColumn(resultSetColumnNames.get(schemaColName));
317+
String name = resultSetMetadata.getColumnName(columnIndex);
318+
String columnTypeName = resultSetMetadata.getColumnTypeName(columnIndex);
319+
int type = resultSetMetadata.getColumnType(columnIndex);
320+
columnTypes.add(new ColumnType(name, columnTypeName, type));
306321
}
307322
return columnTypes;
308323
}

database-commons/src/test/java/io/cdap/plugin/db/batch/sink/AbstractDBSinkTest.java

Lines changed: 121 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@
1717
package io.cdap.plugin.db.batch.sink;
1818

1919
import com.google.common.collect.ImmutableList;
20+
import com.mockrunner.mock.jdbc.MockResultSet;
2021
import com.mockrunner.mock.jdbc.MockResultSetMetaData;
2122
import io.cdap.plugin.db.ColumnType;
2223
import org.junit.Assert;
2324
import org.junit.Test;
2425

2526
import java.sql.SQLException;
2627
import java.util.ArrayList;
28+
import java.util.HashSet;
2729
import java.util.List;
30+
import java.util.Set;
2831

2932
/**
3033
* Test class for abstract sink.
3134
*/
3235
public class AbstractDBSinkTest {
3336

3437
@Test
35-
public void testGetMatchedColumnTypeList() throws SQLException {
38+
public void testGetMatchedColumnTypeList() throws Exception {
3639
List<String> columns = ImmutableList.of(
3740
"ID",
3841
"NAME",
@@ -45,20 +48,25 @@ public void testGetMatchedColumnTypeList() throws SQLException {
4548
resultSetMetaData.setColumnCount(columns.size());
4649

4750
for (int i = 0; i < columns.size(); i++) {
48-
String name = columns.get(i);
4951
resultSetMetaData.setColumnName(i + 1, columns.get(i));
5052
resultSetMetaData.setColumnTypeName(i + 1, "STRING");
5153
resultSetMetaData.setColumnType(i + 1, i);
52-
expectedColumns.add(new ColumnType(name, "STRING", i));
54+
expectedColumns.add(new ColumnType(columns.get(i), "STRING", i));
5355
}
5456

55-
List<ColumnType> result = AbstractDBSink.getMatchedColumnTypeList(resultSetMetaData, columns);
57+
MockResultSet resultSet = new MockResultSet("data");
58+
Set<String> columnNamesSet = new HashSet<String>();
59+
columnNamesSet.addAll(columns);
60+
resultSet.addColumns(columnNamesSet);
61+
resultSet.setResultSetMetaData(resultSetMetaData);
62+
63+
List<ColumnType> result = AbstractDBSink.getMatchedColumnTypeList(resultSet, columns);
5664

5765
Assert.assertEquals(expectedColumns, result);
5866
}
5967

6068
@Test
61-
public void testGetMismatchColumnTypeList() throws SQLException {
69+
public void testGetMismatchColumnTypeList() throws Exception {
6270
List<String> wrongColumns = ImmutableList.of(
6371
"MY_ID",
6472
"NAME",
@@ -80,12 +88,119 @@ public void testGetMismatchColumnTypeList() throws SQLException {
8088
resultSetMetaData.setColumnType(i + 1, i);
8189
}
8290

91+
MockResultSet resultSet = new MockResultSet("data");
92+
Set<String> columnNamesSet = new HashSet<String>();
93+
columnNamesSet.addAll(columns);
94+
resultSet.addColumns(columnNamesSet);
95+
resultSet.setResultSetMetaData(resultSetMetaData);
96+
8397
try {
84-
AbstractDBSink.getMatchedColumnTypeList(resultSetMetaData, wrongColumns);
98+
AbstractDBSink.getMatchedColumnTypeList(resultSet, wrongColumns);
8599
Assert.fail(String.format("Expected to throw %s", IllegalArgumentException.class.getName()));
86100
} catch (IllegalArgumentException e) {
87101
String errorMessage = "Missing column 'MY_ID' in SQL table";
88102
Assert.assertEquals(errorMessage, e.getMessage());
89103
}
90104
}
105+
106+
@Test
107+
public void testDifferentOrderOfFieldsInResultSet() throws Exception {
108+
List<String> diffOrdCol = ImmutableList.of(
109+
"Name",
110+
"SCORE",
111+
"ID"
112+
);
113+
114+
List<String> columns = ImmutableList.of(
115+
"ID",
116+
"NAME",
117+
"SCORE"
118+
);
119+
120+
List<String> typeName = ImmutableList.of(
121+
"INT",
122+
"STRING",
123+
"DOUBLE"
124+
);
125+
126+
List<Integer> typeValue = ImmutableList.of(
127+
1,
128+
2,
129+
3
130+
);
131+
132+
List<ColumnType> expectedColumns = new ArrayList<>();
133+
MockResultSetMetaData resultSetMetaData = new MockResultSetMetaData();
134+
resultSetMetaData.setColumnCount(columns.size());
135+
136+
for (int i = 0; i < columns.size(); i++) {
137+
resultSetMetaData.setColumnName(i + 1, columns.get(i));
138+
resultSetMetaData.setColumnTypeName(i + 1, typeName.get(i));
139+
resultSetMetaData.setColumnType(i + 1, typeValue.get(i));
140+
expectedColumns.add(new ColumnType(columns.get(i), typeName.get(i), typeValue.get(i)));
141+
}
142+
143+
MockResultSet resultSet = new MockResultSet("data");
144+
Set<String> columnNamesSet = new HashSet<String>();
145+
columnNamesSet.addAll(columns);
146+
resultSet.addColumns(columnNamesSet);
147+
resultSet.setResultSetMetaData(resultSetMetaData);
148+
149+
List<ColumnType> actualColumns = AbstractDBSink.getMatchedColumnTypeList(resultSet, diffOrdCol);
150+
151+
// Assert that all expected fields are present in the actual fields
152+
for (ColumnType exColType : expectedColumns) {
153+
Assert.assertTrue(actualColumns.contains(exColType));
154+
}
155+
}
156+
157+
@Test
158+
public void testSubsetColumnsInResultSet() throws Exception {
159+
List<String> subsetCol = ImmutableList.of(
160+
"SCORE",
161+
"ID"
162+
);
163+
164+
List<String> columns = ImmutableList.of(
165+
"ID",
166+
"NAME",
167+
"SCORE"
168+
);
169+
170+
List<String> typeName = ImmutableList.of(
171+
"INT",
172+
"STRING",
173+
"DOUBLE"
174+
);
175+
176+
List<Integer> typeValue = ImmutableList.of(
177+
1,
178+
2,
179+
3
180+
);
181+
182+
List<ColumnType> expectedColumns = new ArrayList<>();
183+
MockResultSetMetaData resultSetMetaData = new MockResultSetMetaData();
184+
resultSetMetaData.setColumnCount(columns.size());
185+
186+
for (int i = 0; i < columns.size(); i++) {
187+
resultSetMetaData.setColumnName(i + 1, columns.get(i));
188+
resultSetMetaData.setColumnTypeName(i + 1, typeName.get(i));
189+
resultSetMetaData.setColumnType(i + 1, typeValue.get(i));
190+
expectedColumns.add(new ColumnType(columns.get(i), typeName.get(i), typeValue.get(i)));
191+
}
192+
193+
MockResultSet resultSet = new MockResultSet("data");
194+
Set<String> columnNamesSet = new HashSet<String>();
195+
columnNamesSet.addAll(columns);
196+
resultSet.addColumns(columnNamesSet);
197+
resultSet.setResultSetMetaData(resultSetMetaData);
198+
199+
List<ColumnType> actualColumns = AbstractDBSink.getMatchedColumnTypeList(resultSet, subsetCol);
200+
201+
// Assert that all actual fields are present in the expected fields
202+
for (ColumnType acColType : actualColumns) {
203+
Assert.assertTrue(expectedColumns.contains(acColType));
204+
}
205+
}
91206
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,22 @@ public void readFields(ResultSet resultSet) throws SQLException {
8080

8181
// All LONG or LONG RAW columns have to be retrieved from the ResultSet prior to all the other columns.
8282
// Otherwise, we will face java.sql.SQLException: Stream has already been closed
83-
for (int i = 0; i < schema.getFields().size(); i++) {
84-
if (isLongOrLongRaw(metadata.getColumnType(i + 1))) {
85-
readField(i, metadata, resultSet, schema, recordBuilder);
83+
for (Schema.Field field : schema.getFields()) {
84+
// Index of a field in the schema may not be same in the ResultSet,
85+
// hence find the field by name in the given resultSet
86+
int columnIndex = resultSet.findColumn(field.getName());
87+
if (isLongOrLongRaw(metadata.getColumnType(columnIndex))) {
88+
readField(columnIndex, metadata, resultSet, field, recordBuilder);
8689
}
8790
}
8891

8992
// Read fields of other types
90-
for (int i = 0; i < schema.getFields().size(); i++) {
91-
if (!isLongOrLongRaw(metadata.getColumnType(i + 1))) {
92-
readField(i, metadata, resultSet, schema, recordBuilder);
93+
for (Schema.Field field : schema.getFields()) {
94+
// Index of a field in the schema may not be same in the ResultSet,
95+
// hence find the field by name in the given resultSet
96+
int columnIndex = resultSet.findColumn(field.getName());
97+
if (!isLongOrLongRaw(metadata.getColumnType(columnIndex))) {
98+
readField(columnIndex, metadata, resultSet, field, recordBuilder);
9399
}
94100
}
95101

@@ -242,10 +248,8 @@ private boolean isLongOrLongRaw(int columnType) {
242248
return columnType == OracleSourceSchemaReader.LONG || columnType == OracleSourceSchemaReader.LONG_RAW;
243249
}
244250

245-
private void readField(int index, ResultSetMetaData metadata, ResultSet resultSet, Schema schema,
251+
private void readField(int columnIndex, ResultSetMetaData metadata, ResultSet resultSet, Schema.Field field,
246252
StructuredRecord.Builder recordBuilder) throws SQLException {
247-
Schema.Field field = schema.getFields().get(index);
248-
int columnIndex = index + 1;
249253
int sqlType = metadata.getColumnType(columnIndex);
250254
int sqlPrecision = metadata.getPrecision(columnIndex);
251255
int sqlScale = metadata.getScale(columnIndex);

0 commit comments

Comments
 (0)