File tree Expand file tree Collapse file tree 2 files changed +76
-9
lines changed
impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala Expand file tree Collapse file tree 2 files changed +76
-9
lines changed Original file line number Diff line number Diff line change @@ -49,16 +49,80 @@ CREATE TABLE tableName(
4949
5050## 5.样例:
5151```
52+ CREATE TABLE MyTable(
53+ channel VARCHAR,
54+ pt int,
55+ xctime varchar,
56+ name varchar
57+ )WITH(
58+ type ='kafka11',
59+ bootstrapServers ='172.16.8.107:9092',
60+ zookeeperQuorum ='172.16.8.107:2181/kafka',
61+ offsetReset ='latest',
62+ topic ='mqTest03'
63+ );
64+
5265CREATE TABLE MyResult(
53- channel VARCHAR,
54- pv VARCHAR
66+ a STRING,
67+ b STRING
68+ )WITH(
69+ type ='impala',
70+ url ='jdbc:impala://172.16.101.252:21050/hxbho_pub',
71+ userName ='root',
72+ password ='pwd',
73+ authMech ='3',
74+ tableName ='tb_result_4',
75+ parallelism ='1',
76+ -- 指定分区
77+ partitionFields = 'pt=1001,name="name1001" ',
78+ batchSize = '1000',
79+ parallelism ='2'
80+ );
81+
82+ CREATE TABLE MyResult1(
83+ a STRING,
84+ b STRING,
85+ pt int,
86+ name STRING
5587 )WITH(
5688 type ='impala',
57- url ='jdbc:impala://localhost:21050/mytest',
58- userName ='dtstack',
59- password ='abc123',
60- authMech = '3',
61- tableName ='pv2',
62- parallelism ='1'
63- )
89+ url ='jdbc:impala://172.16.101.252:21050/hxbho_pub',
90+ userName ='root',
91+ password ='Wscabc123..@',
92+ authMech ='3',
93+ tableName ='tb_result_4',
94+ parallelism ='1',
95+ enablePartition ='true',
96+ -- 动态分区
97+ partitionFields = 'pt,name ',
98+ batchSize = '1000',
99+ parallelism ='2'
100+ );
101+
102+
103+ insert
104+ into
105+ MyResult1
106+ select
107+ xctime AS b,
108+ channel AS a,
109+ pt,
110+ name
111+ from
112+ MyTable;
113+
114+
115+
116+ insert
117+ into
118+ MyResult
119+ select
120+ xctime AS b,
121+ channel AS a
122+ from
123+ MyTable;
124+
125+
126+
127+
64128 ```
Original file line number Diff line number Diff line change 2323import com .dtstack .flink .sql .sink .rdb .RdbSink ;
2424import com .dtstack .flink .sql .sink .rdb .format .RetractJDBCOutputFormat ;
2525import com .dtstack .flink .sql .table .TargetTableInfo ;
26+ import org .apache .commons .lang .StringUtils ;
2627import org .apache .flink .streaming .api .functions .sink .OutputFormatSinkFunction ;
2728import org .apache .flink .streaming .api .functions .sink .RichSinkFunction ;
2829
@@ -89,6 +90,8 @@ public void buildInsertSql(String tableName, List<String> fields) {
8990 boolean enablePartition = impalaTableInfo .isEnablePartition ();
9091 if (enablePartition ) {
9192 String partitionFieldsStr = impalaTableInfo .getPartitionFields ();
93+ partitionFieldsStr = !StringUtils .isEmpty (partitionFieldsStr ) ? partitionFieldsStr .replaceAll ("\" " , "'" ) : partitionFieldsStr ;
94+
9295 List <String > partitionFields = Arrays .asList (partitionFieldsStr .split ("," ));
9396 List <String > newFields = new ArrayList <>();
9497 for (String field : fields ) {
You can’t perform that action at this time.
0 commit comments