Skip to content

Commit d8873f9

Browse files
committed
[fix] impala varchar data error; storeType Null point Exception; values condition quote bug;
1 parent 59e83f8 commit d8873f9

File tree

4 files changed

+38
-9
lines changed

4 files changed

+38
-9
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7676
// cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
7777
private static final Pattern TYPE_PATTERN = Pattern.compile("cast\\((.*) as (.*)\\)");
7878
//specific type which values need to be quoted
79-
private static final String[] NEED_QUOTE_TYPE = {"string", "timestamp"};
79+
private static final String[] NEED_QUOTE_TYPE = {"string", "timestamp", "varchar"};
8080

8181
private static final Integer DEFAULT_CONN_TIME_OUT = 60;
8282
private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000;
@@ -85,8 +85,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8585
private static final String KUDU_TYPE = "kudu";
8686
private static final String UPDATE_MODE = "update";
8787
private static final String PARTITION_CONSTANT = "PARTITION";
88-
private static final String STRING_TYPE = "STRING";
89-
private static final String TIMESTAMP_TYPE = "TIMESTAMP";
9088
private static final String DRIVER_NAME = "com.cloudera.impala.jdbc41.Driver";
9189

9290
private static final String VALUES_CONDITION = "${valuesCondition}";
@@ -363,8 +361,8 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
363361
* @return quoted condition
364362
*/
365363
private String valueConditionAddQuotation(String valueCondition) {
366-
final String[] valueConditionCopy = {valueCondition};
367364
String[] temps = valueCondition.split(",");
365+
List<String> replacedItem = new ArrayList<>();
368366
Arrays.stream(temps).forEach(
369367
item -> {
370368
Matcher matcher = TYPE_PATTERN.matcher(item);
@@ -374,13 +372,15 @@ private String valueConditionAddQuotation(String valueCondition) {
374372

375373
if (Arrays.asList(NEED_QUOTE_TYPE).contains(type)) {
376374
if (!"null".equals(value)) {
377-
valueConditionCopy[0] = valueConditionCopy[0].replace(value, "'" + value + "'");
375+
item = item.replace(value, "'" + value + "'");
378376
}
379377
}
380378
}
379+
replacedItem.add(item);
381380
}
382381
);
383-
return "(" + valueConditionCopy[0] + ")";
382+
383+
return "(" + String.join(", ", replacedItem) + ")";
384384
}
385385

386386
@Override
@@ -745,6 +745,8 @@ public ImpalaOutputFormat build() {
745745
checkNotNull(format.password, "password is required!");
746746
}
747747

748+
checkNotNull(format.storeType, "storeType is required!");
749+
748750
return format;
749751
}
750752

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
public class ImpalaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<ImpalaSink> {
4747

4848
private static final String DEFAULT_STORE_TYPE = "kudu";
49-
private static final String DEFAULT_PARTITION_MODE = "dynamic";
5049

5150
protected String[] fieldNames;
5251
TypeInformation<?>[] fieldTypes;

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaSinkParser.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Arrays;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Objects;
2829

2930
/**
3031
* Date: 2020/10/14
@@ -76,6 +77,8 @@ public class ImpalaSinkParser extends AbstractTableParser {
7677

7778
private static final String KUDU_TYPE = "kudu";
7879

80+
private static final String DEFAULT_STORE_TYPE = "kudu";
81+
7982
private static final String STORE_TYPE_KEY = "storeType";
8083

8184
private static final String KRB_DEFAULT_REALM = "HADOOP.COM";
@@ -123,13 +126,13 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
123126
}
124127

125128
String storeType = MathUtil.getString(props.get(STORE_TYPE_KEY.toLowerCase()));
126-
impalaTableInfo.setStoreType(storeType);
129+
impalaTableInfo.setStoreType(Objects.isNull(storeType) ? DEFAULT_STORE_TYPE : storeType);
127130

128131
String enablePartitionStr = (String) props.get(ENABLE_PARTITION_KEY.toLowerCase());
129132
boolean enablePartition = MathUtil.getBoolean(enablePartitionStr == null ? "false" : enablePartitionStr);
130133
impalaTableInfo.setEnablePartition(enablePartition);
131134

132-
if (!storeType.equalsIgnoreCase(KUDU_TYPE) && enablePartition) {
135+
if (!impalaTableInfo.getStoreType().equalsIgnoreCase(KUDU_TYPE) && enablePartition) {
133136
String partitionFields = MathUtil.getString(props.get(PARTITION_FIELDS_KEY.toLowerCase()));
134137
impalaTableInfo.setPartitionFields(partitionFields);
135138
}

localTest/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,31 @@
9999
<version>1.0-SNAPSHOT</version>
100100
</dependency>
101101

102+
103+
<dependency>
104+
<groupId>com.dtstack.flink</groupId>
105+
<artifactId>sql.mysql</artifactId>
106+
<version>1.0-SNAPSHOT</version>
107+
</dependency>
108+
109+
<dependency>
110+
<groupId>com.dtstack.flink</groupId>
111+
<artifactId>sql.side.all.mysql</artifactId>
112+
<version>1.0-SNAPSHOT</version>
113+
</dependency>
114+
115+
<dependency>
116+
<groupId>com.dtstack.flink</groupId>
117+
<artifactId>sql.sink.mysql</artifactId>
118+
<version>1.0-SNAPSHOT</version>
119+
</dependency>
120+
121+
<dependency>
122+
<groupId>com.dtstack.flink</groupId>
123+
<artifactId>sql.side.async.mysql</artifactId>
124+
<version>1.0-SNAPSHOT</version>
125+
</dependency>
126+
102127
<dependency>
103128
<groupId>com.dtstack.flink</groupId>
104129
<artifactId>sql.console</artifactId>

0 commit comments

Comments
 (0)