Skip to content

Commit 78b3d42

Browse files
author
gituser
committed
Merge branch 'v1.8.0_dev' into 1.8_v3.9.0_beta_1.0
2 parents a3bfb1c + 7c3d531 commit 78b3d42

File tree

19 files changed

+728
-171
lines changed

19 files changed

+728
-171
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121
package com.dtstack.flink.sql;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.config.CalciteConfig;
24+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2425
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2526
import com.dtstack.flink.sql.enums.ClusterMode;
2627
import com.dtstack.flink.sql.enums.ECacheType;
@@ -77,8 +78,6 @@
7778
import java.net.URL;
7879
import java.net.URLClassLoader;
7980
import java.net.URLDecoder;
80-
import java.util.ArrayList;
81-
import java.util.Arrays;
8281
import java.util.List;
8382
import java.util.Map;
8483
import java.util.Properties;
@@ -101,10 +100,6 @@ public class Main {
101100

102101
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
103102

104-
private static Config config = org.apache.calcite.sql.parser.SqlParser
105-
.configBuilder()
106-
.setLex(Lex.MYSQL)
107-
.build();
108103

109104
public static void main(String[] args) throws Exception {
110105

@@ -126,10 +121,6 @@ public static void main(String[] args) throws Exception {
126121
addJarFileList = objMapper.readValue(addJarListStr, List.class);
127122
}
128123

129-
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
130-
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
131-
Thread.currentThread().setContextClassLoader(parentClassloader);
132-
133124
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
134125
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
135126
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
@@ -148,16 +139,14 @@ public static void main(String[] args) throws Exception {
148139
Map<String, Table> registerTableCache = Maps.newHashMap();
149140

150141
//register udf
151-
registerUDF(sqlTree, jarURList, parentClassloader, tableEnv);
142+
registerUDF(sqlTree, jarURList, tableEnv);
152143
//register table schema
153144
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
154145

155146
sqlTranslation(options,tableEnv,sqlTree,sideTableMap,registerTableCache);
156147

157148
if(env instanceof MyLocalStreamEnvironment) {
158-
List<URL> urlList = new ArrayList<>();
159-
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
160-
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
149+
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
161150
}
162151

163152
env.execute(name);
@@ -180,7 +169,7 @@ private static void sqlTranslation(Options options,StreamTableEnvironment tableE
180169
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
181170
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
182171

183-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql,config).parseStmt();
172+
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
184173
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
185174
tmp.setExecSql(tmpSql);
186175
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
@@ -224,19 +213,19 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
224213
}
225214
}
226215

227-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
228-
StreamTableEnvironment tableEnv)
229-
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
230-
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
231-
if (funcList.isEmpty()) {
232-
return;
233-
}
234-
//load jar
235-
URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
216+
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
217+
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
236218
//register urf
219+
// udf和tableEnv须由同一个类加载器加载
220+
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
221+
URLClassLoader classLoader = null;
222+
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
237223
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
238-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
239-
tableEnv, classLoader);
224+
//classloader
225+
if (classLoader == null) {
226+
classLoader = FlinkUtil.loadExtraJar(jarURList, (URLClassLoader)levelClassLoader);
227+
}
228+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
240229
}
241230
}
242231

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.classloader;
20+
21+
import com.dtstack.flink.sql.util.PluginUtil;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.net.URL;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
34+
/**
35+
* company: www.dtstack.com
36+
* author: toutian
37+
* create: 2019/10/14
38+
*/
39+
public class ClassLoaderManager {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderManager.class);
42+
43+
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
44+
45+
public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
46+
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
47+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
48+
}
49+
50+
public static <R> R newInstance(List<URL> jarUrls, ClassLoaderSupplier<R> supplier) throws Exception {
51+
ClassLoader classLoader = retrieveClassLoad(jarUrls);
52+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
53+
}
54+
55+
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
56+
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
57+
try {
58+
URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
59+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
60+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
61+
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
62+
return classLoader;
63+
} catch (Throwable e) {
64+
LOG.error("retrieve ClassLoad happens error:{}", e);
65+
throw new RuntimeException("retrieve ClassLoad happens error");
66+
}
67+
});
68+
}
69+
70+
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
71+
jarUrls.sort(Comparator.comparing(URL::toString));
72+
String jarUrlkey = StringUtils.join(jarUrls, "_");
73+
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
74+
try {
75+
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);
76+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
77+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
78+
LOG.info("jarUrl:{} create ClassLoad successful...", jarUrlkey);
79+
return classLoader;
80+
} catch (Throwable e) {
81+
LOG.error("retrieve ClassLoad happens error:{}", e);
82+
throw new RuntimeException("retrieve ClassLoad happens error");
83+
}
84+
});
85+
}
86+
87+
public static List<URL> getClassPath() {
88+
List<URL> classPaths = new ArrayList<>();
89+
for (Map.Entry<String, DtClassLoader> entry : pluginClassLoader.entrySet()) {
90+
classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
91+
}
92+
return classPaths;
93+
}
94+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* Represents a supplier of results.
24+
*
25+
* <p>There is no requirement that a new or distinct result be returned each
26+
* time the supplier is invoked.
27+
*
28+
* <p>This is a <a href="package-summary.html">functional interface</a>
29+
* whose functional method is {@link #get()}.
30+
*
31+
* @param <T> the type of results supplied by this supplier
32+
*
33+
* @since 1.8
34+
*/
35+
@FunctionalInterface
36+
public interface ClassLoaderSupplier<T> {
37+
38+
/**
39+
* Gets a result.
40+
*
41+
* @return a result
42+
*/
43+
T get(ClassLoader cl) throws Exception;
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* company: www.dtstack.com
24+
* author: toutian
25+
* create: 2019/10/14
26+
*/
27+
public class ClassLoaderSupplierCallBack {
28+
29+
public static <R> R callbackAndReset(ClassLoaderSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
30+
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
31+
Thread.currentThread().setContextClassLoader(toSetClassLoader);
32+
try {
33+
return supplier.get(toSetClassLoader);
34+
} finally {
35+
Thread.currentThread().setContextClassLoader(oldClassLoader);
36+
}
37+
}
38+
39+
40+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.config;
21+
22+
import org.apache.calcite.config.Lex;
23+
import org.apache.calcite.sql.parser.SqlParser;
24+
import org.apache.calcite.sql.parser.SqlParser.Config;
25+
26+
public class CalciteConfig {
27+
28+
public static Config MYSQL_LEX_CONFIG = SqlParser
29+
.configBuilder()
30+
.setLex(Lex.MYSQL)
31+
.build();
32+
33+
34+
35+
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28+
import java.util.Map;
2829

2930
/**
3031
* Join信息
@@ -40,6 +41,8 @@ public class JoinInfo implements Serializable {
4041

4142
//左表是否是维表
4243
private boolean leftIsSideTable;
44+
//左表是 转换后的中间表
45+
private boolean leftIsMidTable;
4346

4447
//右表是否是维表
4548
private boolean rightIsSideTable;
@@ -63,6 +66,8 @@ public class JoinInfo implements Serializable {
6366
private SqlNode selectNode;
6467

6568
private JoinType joinType;
69+
// 左边是中间转换表,做表映射关系,给替换属性名称使用
70+
private Map<String, String> leftTabMapping;
6671

6772
public String getSideTableName(){
6873
if(leftIsSideTable){
@@ -87,6 +92,22 @@ public String getNewTableName(){
8792
return leftStr + "_" + rightTableName;
8893
}
8994

95+
public boolean isLeftIsMidTable() {
96+
return leftIsMidTable;
97+
}
98+
99+
public void setLeftIsMidTable(boolean leftIsMidTable) {
100+
this.leftIsMidTable = leftIsMidTable;
101+
}
102+
103+
public Map<String, String> getLeftTabMapping() {
104+
return leftTabMapping;
105+
}
106+
107+
public void setLeftTabMapping(Map<String, String> leftTabMapping) {
108+
this.leftTabMapping = leftTabMapping;
109+
}
110+
90111
public String getNewTableAlias(){
91112
return leftTableAlias + "_" + rightTableAlias;
92113
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@
4141

4242
public class ParserJoinField {
4343

44+
4445
/**
45-
* Need to parse the fields of information and where selectlist
46+
* build row by field
47+
* @param sqlNode select node
48+
* @param scope join left and right table all info
49+
* @param getAll true,get all fields from two tables; false, extract useful field from select node
4650
* @return
4751
*/
4852
public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){

0 commit comments

Comments
 (0)