Skip to content

Commit ce2b92a

Browse files
author
toutian
committed
Merge branch '1.8_code_opt' into 'v1.8.0_dev'
1.8 代码优化以及增加autoWaterMarkinterval参数 See merge request !175
2 parents 63d3ea3 + 8ff6205 commit ce2b92a

File tree

16 files changed

+701
-1314
lines changed

16 files changed

+701
-1314
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@
4848
* Java: JDK8及以上
4949
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
5050
* 操作系统:理论上不限
51+
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
52+
```
53+
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
54+
security.kerberos.login.use-ticket-cache: true
55+
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/yanxi.keytab
56+
security.kerberos.login.principal: yanxi@DTSTACK.COM
57+
security.kerberos.login.contexts: Client,KafkaClient
58+
zookeeper.sasl.service-name: zookeeper
59+
zookeeper.sasl.login-context-name: Client
60+
61+
```
5162

5263
### 1.3 打包
5364

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 66 additions & 123 deletions
Large diffs are not rendered by default.

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package com.dtstack.flink.sql.classloader;
2020

2121
import com.dtstack.flink.sql.util.PluginUtil;
22+
import com.dtstack.flink.sql.util.ReflectionUtils;
2223
import org.apache.commons.lang3.StringUtils;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

27+
import java.lang.reflect.InvocationTargetException;
28+
import java.lang.reflect.Method;
2629
import java.net.URL;
30+
import java.net.URLClassLoader;
2731
import java.util.ArrayList;
2832
import java.util.Arrays;
2933
import java.util.Comparator;
@@ -91,4 +95,26 @@ public static List<URL> getClassPath() {
9195
}
9296
return classPaths;
9397
}
98+
99+
100+
101+
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader)
102+
throws IllegalAccessException, InvocationTargetException {
103+
for(URL url : jarURLList){
104+
if(url.toString().endsWith(".jar")){
105+
urlClassLoaderAddUrl(classLoader, url);
106+
}
107+
}
108+
return classLoader;
109+
}
110+
111+
private static void urlClassLoaderAddUrl(URLClassLoader classLoader, URL url) throws InvocationTargetException, IllegalAccessException {
112+
Method method = ReflectionUtils.getDeclaredMethod(classLoader, "addURL", URL.class);
113+
114+
if (method == null) {
115+
throw new RuntimeException("can't not find declared method addURL, curr classLoader is " + classLoader.getClass());
116+
}
117+
method.setAccessible(true);
118+
method.invoke(classLoader, url);
119+
}
94120
}

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ public class ConfigConstrant {
5353
public static final String SQL_BUFFER_TIMEOUT_MILLIS = "sql.buffer.timeout.millis";
5454

5555
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";
56+
// default 200ms
57+
public static final String AUTO_WATERMARK_INTERVAL_KEY = "autoWatermarkInterval";
5658

5759
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
58-
5960
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
6061

6162
public static final String STATE_BACKEND_KEY = "state.backend";

core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,14 @@
2424
*/
2525
public enum ClusterMode {
2626

27-
local(0),standalone(1),yarn(2),yarnPer(3);
27+
//run in local
28+
local(0),
29+
//submit job to standalone cluster
30+
standalone(1),
31+
//submit job to flink-session which is already run on yarn
32+
yarn(2),
33+
//submit job to yarn cluster as an application
34+
yarnPer(3);
2835

2936
private int type;
3037

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 334 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.function;
20+
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.table.api.TableEnvironment;
23+
import org.apache.flink.table.api.java.StreamTableEnvironment;
24+
import org.apache.flink.table.functions.ScalarFunction;
25+
import org.apache.flink.table.functions.TableFunction;
26+
import org.apache.flink.table.functions.AggregateFunction;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
32+
/**
33+
* 自定义函数管理类
34+
* Reason:
35+
* Date: 2017/2/21
36+
* Company: www.dtstack.com
37+
* @author xuchao
38+
*/
39+
40+
public class FunctionManager {
41+
private static final Logger logger = LoggerFactory.getLogger(FunctionManager.class);
42+
43+
/**
44+
* TABLE|SCALA|AGGREGATE
45+
* 注册UDF到table env
46+
*/
47+
public static void registerUDF(String type, String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader) {
48+
if ("SCALA".equalsIgnoreCase(type)) {
49+
registerScalaUDF(classPath, funcName, tableEnv, classLoader);
50+
} else if ("TABLE".equalsIgnoreCase(type)) {
51+
registerTableUDF(classPath, funcName, tableEnv, classLoader);
52+
} else if ("AGGREGATE".equalsIgnoreCase(type)) {
53+
registerAggregateUDF(classPath, funcName, tableEnv, classLoader);
54+
} else {
55+
throw new RuntimeException("not support of UDF which is not in (TABLE, SCALA, AGGREGATE)");
56+
}
57+
}
58+
59+
/**
60+
* 注册自定义方法到env上
61+
* @param classPath
62+
* @param funcName
63+
* @param tableEnv
64+
*/
65+
public static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader) {
66+
try {
67+
ScalarFunction udfFunc = Class.forName(classPath, false, classLoader)
68+
.asSubclass(ScalarFunction.class).newInstance();
69+
tableEnv.registerFunction(funcName, udfFunc);
70+
logger.info("register scala function:{} success.", funcName);
71+
} catch (Exception e) {
72+
logger.error("", e);
73+
throw new RuntimeException("register UDF exception:", e);
74+
}
75+
}
76+
77+
/**
78+
* 注册自定义TABLEFFUNC方法到env上
79+
*
80+
* @param classPath
81+
* @param funcName
82+
* @param tableEnv
83+
*/
84+
public static void registerTableUDF(String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader) {
85+
try {
86+
checkStreamTableEnv(tableEnv);
87+
TableFunction udtf = Class.forName(classPath, false, classLoader)
88+
.asSubclass(TableFunction.class).newInstance();
89+
90+
((StreamTableEnvironment) tableEnv).registerFunction(funcName, udtf);
91+
logger.info("register table function:{} success.", funcName);
92+
} catch (Exception e) {
93+
logger.error("", e);
94+
throw new RuntimeException("register Table UDF exception:", e);
95+
}
96+
}
97+
98+
private static void checkStreamTableEnv(TableEnvironment tableEnv) {
99+
if (!(tableEnv instanceof StreamTableEnvironment)) {
100+
throw new RuntimeException("no support tableEnvironment class for " + tableEnv.getClass().getName());
101+
}
102+
}
103+
104+
/**
105+
* 注册自定义Aggregate FUNC方法到env上
106+
*
107+
* @param classPath
108+
* @param funcName
109+
* @param tableEnv
110+
*/
111+
public static void registerAggregateUDF(String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader) {
112+
try {
113+
checkStreamTableEnv(tableEnv);
114+
115+
AggregateFunction udaf = Class.forName(classPath, false, classLoader)
116+
.asSubclass(AggregateFunction.class).newInstance();
117+
((StreamTableEnvironment) tableEnv).registerFunction(funcName, udaf);
118+
logger.info("register Aggregate function:{} success.", funcName);
119+
} catch (Exception e) {
120+
logger.error("", e);
121+
throw new RuntimeException("register Aggregate UDF exception:", e);
122+
}
123+
}
124+
125+
126+
public static TypeInformation[] transformTypes(Class[] fieldTypes) {
127+
TypeInformation[] types = new TypeInformation[fieldTypes.length];
128+
for (int i = 0; i < fieldTypes.length; i++) {
129+
types[i] = TypeInformation.of(fieldTypes[i]);
130+
}
131+
132+
return types;
133+
}
134+
}

0 commit comments

Comments
 (0)