Skip to content

Commit 33047f6

Browse files
author
dapeng
committed
解决中文乱吗问题
1 parent e69f484 commit 33047f6

File tree

3 files changed

+17
-4
lines changed

3 files changed

+17
-4
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaDialect.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
2222
import com.google.common.collect.Lists;
2323
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2425

2526
import java.util.Arrays;
2627
import java.util.List;
@@ -38,6 +39,12 @@ public class ImpalaDialect implements JDBCDialect {
3839

3940
private static final String IMPALA_PARTITION_KEYWORD = "partition";
4041

42+
private TypeInformation[] fieldTypes;
43+
44+
public ImpalaDialect(TypeInformation[] fieldTypes){
45+
this.fieldTypes = fieldTypes;
46+
}
47+
4148
@Override
4249
public boolean canHandle(String url) {
4350
return url.startsWith("jdbc:impala:");
@@ -70,8 +77,13 @@ public String getInsertIntoStatement(String schema, String tableName, String[] f
7077
.map(this::quoteIdentifier)
7178
.collect(Collectors.joining(", "));
7279

73-
String placeholders = Arrays.stream(fieldNames)
74-
.map(f -> "?")
80+
String placeholders = Arrays.stream(fieldTypes)
81+
.map(f -> {
82+
if(String.class.getName().equals(f.getTypeClass().getName())){
83+
return "cast( ? as string)";
84+
}
85+
return "?";
86+
})
7587
.collect(Collectors.joining(", "));
7688

7789
String partitionFieldStr = partitionFieldsList.stream()

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ public class ImpalaSink extends AbstractRdbSink implements IStreamSinkGener<Abst
4141
private ImpalaTableInfo impalaTableInfo;
4242

4343
public ImpalaSink() {
44-
super(new ImpalaDialect());
44+
super(null);
4545
}
4646

4747
@Override
4848
public JDBCUpsertOutputFormat getOutputFormat() {
4949
JDBCOptions jdbcOptions = JDBCOptions.builder()
5050
.setDbUrl(getImpalaJdbcUrl())
51-
.setDialect(jdbcDialect)
51+
.setDialect(new ImpalaDialect(getFieldTypes()))
5252
.setUsername(userName)
5353
.setPassword(password)
5454
.setTableName(tableName)

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaSinkParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5454
impalaTableInfo.setBufferSize(MathUtil.getString(props.get(ImpalaTableInfo.BUFFER_SIZE_KEY.toLowerCase())));
5555
impalaTableInfo.setFlushIntervalMs(MathUtil.getString(props.get(ImpalaTableInfo.FLUSH_INTERVALMS_KEY.toLowerCase())));
5656
impalaTableInfo.setSchema(MathUtil.getString(props.get(ImpalaTableInfo.SCHEMA_KEY.toLowerCase())));
57+
impalaTableInfo.setUpdateMode(MathUtil.getString(props.get(ImpalaTableInfo.UPDATE_KEY.toLowerCase())));
5758

5859
Integer authMech = MathUtil.getIntegerVal(props.get(ImpalaTableInfo.AUTHMECH_KEY.toLowerCase()));
5960
authMech = authMech == null ? 0 : authMech;

0 commit comments

Comments
 (0)