Skip to content

Commit 6749990

Browse files
committed
Merge branch 'feat_1.8_addKafkaDecoder' into '1.8_test_3.10.x'
Feat 1.8 add kafka decoder See merge request dt-insight-engine/flinkStreamSQL!69
2 parents bf1f766 + 44c97a3 commit 6749990

File tree

40 files changed

+1154
-123
lines changed

40 files changed

+1154
-123
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
6767
vo.setFileResolverCachingEnabled(false);
6868
Vertx vertx = Vertx.vertx(vo);
6969
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
70-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
71-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7270
}
7371

7472
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.types.Row;
3939

4040
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
41-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4241
import com.dtstack.flink.sql.enums.ClusterMode;
4342
import com.dtstack.flink.sql.enums.ECacheType;
4443
import com.dtstack.flink.sql.enums.EPluginLoadMode;

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.types.Row;
3535

3636
import java.io.IOException;
37+
import java.nio.charset.StandardCharsets;
3738
import java.sql.Date;
3839
import java.sql.Time;
3940
import java.sql.Timestamp;
@@ -43,7 +44,7 @@
4344

4445
/**
4546
* source data parse to json format
46-
*
47+
* <p>
4748
* Date: 2019/12/12
4849
* Company: www.dtstack.com
4950
*
@@ -53,23 +54,28 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5354

5455
private final ObjectMapper objectMapper = new ObjectMapper();
5556

56-
private Map<String, String> rowAndFieldMapping;
57-
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
57+
private final Map<String, String> rowAndFieldMapping;
58+
private final Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
5859

5960
private final String[] fieldNames;
6061
private final TypeInformation<?>[] fieldTypes;
61-
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
62+
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
63+
private final String charsetName;
6264

63-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
65+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
66+
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
67+
String charsetName) {
6468
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6569
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6670
this.rowAndFieldMapping = rowAndFieldMapping;
6771
this.fieldExtraInfos = fieldExtraInfos;
72+
this.charsetName = charsetName;
6873
}
6974

7075
@Override
7176
public Row deserialize(byte[] message) throws IOException {
72-
JsonNode root = objectMapper.readTree(message);
77+
String decoderStr = new String(message, charsetName);
78+
JsonNode root = objectMapper.readTree(decoderStr);
7379
this.parseTree(root, null);
7480
Row row = new Row(fieldNames.length);
7581

@@ -97,7 +103,7 @@ public Row deserialize(byte[] message) throws IOException {
97103
}
98104
}
99105

100-
private void parseTree(JsonNode jsonNode, String prefix){
106+
private void parseTree(JsonNode jsonNode, String prefix) {
101107
if (jsonNode.isArray()) {
102108
ArrayNode array = (ArrayNode) jsonNode;
103109
for (int i = 0; i < array.size(); i++) {
@@ -116,15 +122,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
116122
return;
117123
}
118124
Iterator<String> iterator = jsonNode.fieldNames();
119-
while (iterator.hasNext()){
125+
while (iterator.hasNext()) {
120126
String next = iterator.next();
121127
JsonNode child = jsonNode.get(next);
122128
String nodeKey = getNodeKey(prefix, next);
123129

124130
nodeAndJsonNodeMapping.put(nodeKey, child);
125-
if(child.isArray()){
131+
if (child.isArray()) {
126132
parseTree(child, nodeKey);
127-
}else {
133+
} else {
128134
parseTree(child, nodeKey);
129135
}
130136
}
@@ -135,8 +141,8 @@ private JsonNode getIgnoreCase(String key) {
135141
return nodeAndJsonNodeMapping.get(nodeMappingKey);
136142
}
137143

138-
private String getNodeKey(String prefix, String nodeName){
139-
if(Strings.isNullOrEmpty(prefix)){
144+
private String getNodeKey(String prefix, String nodeName) {
145+
if (Strings.isNullOrEmpty(prefix)) {
140146
return nodeName;
141147
}
142148
return prefix + "." + nodeName;
@@ -160,15 +166,15 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
160166
} else {
161167
return node.asText();
162168
}
163-
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
169+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
164170
return Date.valueOf(node.asText());
165171
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
166172
// local zone
167173
return Time.valueOf(node.asText());
168174
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
169175
// local zone
170176
return Timestamp.valueOf(node.asText());
171-
} else {
177+
} else {
172178
// for types that were specified without JSON schema
173179
// e.g. POJOs
174180
try {

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ public class Options {
7272
@OptionRequired(description = "log level")
7373
private String logLevel = "info";
7474

75+
@OptionRequired(description = "file add to ship file")
76+
private String addShipfile;
77+
78+
7579
public String getMode() {
7680
return mode;
7781
}
@@ -183,4 +187,13 @@ public String getLogLevel() {
183187
public void setLogLevel(String logLevel) {
184188
this.logLevel = logLevel;
185189
}
190+
191+
public String getAddShipfile() {
192+
return addShipfile;
193+
}
194+
195+
public void setAddShipfile(String addShipfile) {
196+
this.addShipfile = addShipfile;
197+
}
198+
186199
}

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.parser;
2221

@@ -153,14 +152,16 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
153152

154153
/**
155154
* 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题
155+
* 仅对 table.xx 这种类型的字段进行替换
156156
* @param selectList select Node 的 select 字段
157157
* @param sqlSelect 第一层解析出来的 selectNode
158158
*/
159159
private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelect) {
160160
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());
161161

162162
for (int index = 0; index < selectList.size(); index++) {
163-
if (selectList.get(index).getKind().equals(SqlKind.AS)) {
163+
if (selectList.get(index).getKind().equals(SqlKind.AS)
164+
|| ((SqlIdentifier) selectList.get(index)).names.size() == 1) {
164165
sqlNodes.add(selectList.get(index));
165166
continue;
166167
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.enums.ECacheContentType;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2526
import com.dtstack.flink.sql.metric.MetricConstant;
2627
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2728
import com.dtstack.flink.sql.side.cache.CacheObj;
@@ -68,6 +69,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> impl
6869
private int timeOutNum = 0;
6970
protected BaseSideInfo sideInfo;
7071
protected transient Counter parseErrorRecords;
72+
private transient ThreadPoolExecutor cancelExecutor;
7173

7274
public BaseAsyncReqRow(BaseSideInfo sideInfo){
7375
this.sideInfo = sideInfo;
@@ -82,6 +84,8 @@ public void open(Configuration parameters) throws Exception {
8284
super.open(parameters);
8385
initCache();
8486
initMetric();
87+
cancelExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000),
88+
new DTThreadFactory("cancel-timer-executor"));
8589
LOG.info("async dim table config info: {} ", sideInfo.getSideTableInfo().toString());
8690
}
8791

@@ -248,12 +252,11 @@ public void onProcessingTime(long timestamp) throws Exception {
248252
}
249253

250254
protected void cancelTimerWhenComplete(ResultFuture<CRow> resultFuture, ScheduledFuture<?> timerFuture){
251-
ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
252255
if(resultFuture instanceof StreamRecordQueueEntry){
253256
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry) resultFuture;
254257
streamRecordBufferEntry.onComplete((Object value) -> {
255258
timerFuture.cancel(true);
256-
},executors);
259+
}, cancelExecutor);
257260
}
258261
}
259262

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.commons.io.FileUtils;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
/**
29+
* Utility methods for helping with security tasks.
30+
* Date: 2019/12/28
31+
* Company: www.dtstack.com
32+
* @author maqi
33+
*/
34+
public class AuthUtil {
35+
36+
public static String creatJaasFile(String prefix, String suffix, JAASConfig jaasConfig) throws IOException {
37+
File krbConf = new File(System.getProperty("user.dir"));
38+
File temp = File.createTempFile(prefix, suffix, krbConf);
39+
temp.deleteOnExit();
40+
FileUtils.writeStringToFile(temp, jaasConfig.toString());
41+
return temp.getAbsolutePath();
42+
}
43+
44+
45+
public static class JAASConfig {
46+
private String entryName;
47+
private String loginModule;
48+
private String loginModuleFlag;
49+
private Map<String, String> loginModuleOptions;
50+
51+
public JAASConfig(String entryName, String loginModule, String loginModuleFlag, Map<String, String> loginModuleOptions) {
52+
this.entryName = entryName;
53+
this.loginModule = loginModule;
54+
this.loginModuleFlag = loginModuleFlag;
55+
this.loginModuleOptions = loginModuleOptions;
56+
}
57+
58+
public static Builder builder() {
59+
return new Builder();
60+
}
61+
62+
@Override
63+
public String toString() {
64+
StringBuilder stringBuilder = new StringBuilder(entryName).append(" {\n\t")
65+
.append(loginModule).append(" ").append(loginModuleFlag).append("\n\t");
66+
String[] keys = loginModuleOptions.keySet().toArray(new String[loginModuleOptions.size()]);
67+
for (int i = 0; i < keys.length; i++) {
68+
stringBuilder.append(keys[i]).append("=").append(loginModuleOptions.get(keys[i]));
69+
if (i != keys.length - 1) {
70+
stringBuilder.append("\n\t");
71+
} else {
72+
stringBuilder.append(";\n");
73+
}
74+
75+
}
76+
stringBuilder.append("\n").append("};");
77+
return stringBuilder.toString();
78+
}
79+
80+
public static class Builder {
81+
private String entryName;
82+
private String loginModule;
83+
private String loginModuleFlag;
84+
private Map<String, String> loginModuleOptions;
85+
86+
public Builder setEntryName(String entryName) {
87+
this.entryName = entryName;
88+
return this;
89+
}
90+
91+
public Builder setLoginModule(String loginModule) {
92+
this.loginModule = loginModule;
93+
return this;
94+
}
95+
96+
public Builder setLoginModuleFlag(String loginModuleFlag) {
97+
this.loginModuleFlag = loginModuleFlag;
98+
return this;
99+
}
100+
101+
public Builder setLoginModuleOptions(Map<String, String> loginModuleOptions) {
102+
this.loginModuleOptions = loginModuleOptions;
103+
return this;
104+
}
105+
106+
public JAASConfig build() {
107+
return new JAASConfig(
108+
entryName, loginModule, loginModuleFlag, loginModuleOptions);
109+
}
110+
}
111+
}
112+
}

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ public void open(Configuration parameters) throws Exception {
7676
vo.setFileResolverCachingEnabled(false);
7777
Vertx vertx = Vertx.vertx(vo);
7878
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
79-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
80-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("dbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
81-
8279
}
8380

8481
}

docs/config.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ sh submit.sh -key1 val1 -key2 val2
4646
* 描述:扩展jar路径,当前主要是UDF定义的jar;
4747
* 必选:否
4848
* 默认值:无
49+
50+
* **addShipfile**
51+
* 描述:扩展上传的文件,比如开启;Kerberos认证需要的keytab文件和krb5.conf文件
52+
* 必选:否
53+
* 默认值:无
4954

5055
* **confProp**
5156
* 描述:一些参数设置

0 commit comments

Comments
 (0)