Skip to content

Commit 985c904

Browse files
Merge branch 'v1.8.0_dev' of ssh://git.dtstack.cn:10022/dtstack/dt-center-flinkStreamSQL into v1.8.0_dev_field_name_ignore_case
2 parents 32d4255 + a926ddd commit 985c904

File tree

30 files changed

+815
-1366
lines changed

30 files changed

+815
-1366
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@
4848
* Java: JDK8及以上
4949
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
5050
* 操作系统:理论上不限
51+
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
52+
```
53+
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
54+
security.kerberos.login.use-ticket-cache: true
55+
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/yanxi.keytab
56+
security.kerberos.login.principal: yanxi@DTSTACK.COM
57+
security.kerberos.login.contexts: Client,KafkaClient
58+
zookeeper.sasl.service-name: zookeeper
59+
zookeeper.sasl.login-context-name: Client
60+
61+
```
5162

5263
### 1.3 打包
5364

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
268268
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
269269
Thread.sleep(5 * 1000);
270270
} catch (InterruptedException e1) {
271-
e1.printStackTrace();
271+
LOG.error("", e1);
272272
}
273273
}
274274

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 66 additions & 123 deletions
Large diffs are not rendered by default.

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package com.dtstack.flink.sql.classloader;
2020

2121
import com.dtstack.flink.sql.util.PluginUtil;
22+
import com.dtstack.flink.sql.util.ReflectionUtils;
2223
import org.apache.commons.lang3.StringUtils;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

27+
import java.lang.reflect.InvocationTargetException;
28+
import java.lang.reflect.Method;
2629
import java.net.URL;
30+
import java.net.URLClassLoader;
2731
import java.util.ArrayList;
2832
import java.util.Arrays;
2933
import java.util.Comparator;
@@ -91,4 +95,26 @@ public static List<URL> getClassPath() {
9195
}
9296
return classPaths;
9397
}
98+
99+
100+
101+
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader)
102+
throws IllegalAccessException, InvocationTargetException {
103+
for(URL url : jarURLList){
104+
if(url.toString().endsWith(".jar")){
105+
urlClassLoaderAddUrl(classLoader, url);
106+
}
107+
}
108+
return classLoader;
109+
}
110+
111+
private static void urlClassLoaderAddUrl(URLClassLoader classLoader, URL url) throws InvocationTargetException, IllegalAccessException {
112+
Method method = ReflectionUtils.getDeclaredMethod(classLoader, "addURL", URL.class);
113+
114+
if (method == null) {
115+
throw new RuntimeException("can't not find declared method addURL, curr classLoader is " + classLoader.getClass());
116+
}
117+
method.setAccessible(true);
118+
method.invoke(classLoader, url);
119+
}
94120
}

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ public class ConfigConstrant {
5353
public static final String SQL_BUFFER_TIMEOUT_MILLIS = "sql.buffer.timeout.millis";
5454

5555
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";
56+
// default 200ms
57+
public static final String AUTO_WATERMARK_INTERVAL_KEY = "autoWatermarkInterval";
5658

5759
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
58-
5960
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
6061

6162
public static final String STATE_BACKEND_KEY = "state.backend";

core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,14 @@
2424
*/
2525
public enum ClusterMode {
2626

27-
local(0),standalone(1),yarn(2),yarnPer(3);
27+
//run in local
28+
local(0),
29+
//submit job to standalone cluster
30+
standalone(1),
31+
//submit job to flink-session which is already run on yarn
32+
yarn(2),
33+
//submit job to yarn cluster as an application
34+
yarnPer(3);
2835

2936
private int type;
3037

0 commit comments

Comments
 (0)