Skip to content

Commit ababf5c

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into 1.8.0_dev_partitioned_join
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents d8965bc + d1748d2 commit ababf5c

File tree

56 files changed

+3109
-66
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3109
-66
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727

2828
# 已支持
2929
* 源表:kafka 0.9、0.10、0.11、1.x版本
30-
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse
31-
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse
30+
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2
31+
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2
3232

3333
# 后续开发计划
3434
* 维表快照
@@ -186,6 +186,8 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
186186
* [kudu 结果表插件](docs/kuduSink.md)
187187
* [postgresql 结果表插件](docs/postgresqlSink.md)
188188
* [clickhouse 结果表插件](docs/clickhouseSink.md)
189+
* [impala 结果表插件](docs/impalaSink.md)
190+
* [db2 结果表插件](docs/db2Sink.md)
189191

190192
### 2.3 维表插件
191193
* [hbase 维表插件](docs/hbaseSide.md)
@@ -197,6 +199,8 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
197199
* [kudu 维表插件](docs/kuduSide.md)
198200
* [postgresql 维表插件](docs/postgresqlSide.md)
199201
* [clickhouse 维表插件](docs/clickhouseSide.md)
202+
* [impala 维表插件](docs/impalaSide.md)
203+
* [db2 维表插件](docs/db2Side.md)
200204

201205
## 3 性能指标(新增)
202206

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public static void main(String[] args) throws Exception {
127127
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
128128
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
129129
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
130-
StreamTableEnvironment tableEnv = getStreamTableEnv(confProperties, env);
130+
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
131+
StreamQueryConfig queryConfig = getStreamTableEnvTTL(confProperties, tableEnv);
131132

132133
List<URL> jarURList = Lists.newArrayList();
133134
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -146,7 +147,7 @@ public static void main(String[] args) throws Exception {
146147
//register table schema
147148
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
148149

149-
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache);
150+
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache, queryConfig);
150151

151152
if(env instanceof MyLocalStreamEnvironment) {
152153
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -155,7 +156,7 @@ public static void main(String[] args) throws Exception {
155156
env.execute(name);
156157
}
157158

158-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache) throws Exception {
159+
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
159160
SideSqlExec sideSqlExec = new SideSqlExec();
160161
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
161162
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
@@ -185,9 +186,9 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
185186
}
186187
if(isSide){
187188
//sql-dimensional table contains the dimension table of execution
188-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
189+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
189190
}else{
190-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
191+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
191192
if(LOG.isInfoEnabled()){
192193
LOG.info("exec sql: " + result.getExecSql());
193194
}
@@ -353,13 +354,11 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
353354
* 获取StreamTableEnvironment并设置相关属性
354355
*
355356
* @param confProperties
356-
* @param env
357+
* @param tableEnv
357358
* @return
358359
*/
359-
private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) {
360+
private static StreamQueryConfig getStreamTableEnvTTL(Properties confProperties, StreamTableEnvironment tableEnv) {
360361
confProperties = PropertiesUtils.propertiesTrim(confProperties);
361-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
362-
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
363-
return tableEnv;
362+
return FlinkUtil.getTableEnvTTL(confProperties, tableEnv);
364363
}
365364
}

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import org.apache.calcite.sql.SqlIdentifier;
2222
import org.apache.calcite.sql.SqlInsert;
2323
import org.apache.calcite.sql.SqlNode;
24-
import org.apache.flink.table.api.Table;
25-
import org.apache.flink.table.api.TableEnvironment;
26-
import org.apache.flink.table.api.TableException;
27-
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.api.*;
2825
import org.apache.flink.table.api.java.StreamTableEnvironment;
2926
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3027
import org.apache.flink.table.plan.logical.LogicalRelNode;
@@ -41,7 +38,7 @@
4138
*/
4239
public class FlinkSQLExec {
4340

44-
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
41+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, StreamQueryConfig queryConfig) throws Exception {
4542

4643
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
4744
SqlNode insert = planner.parse(stmt);
@@ -78,7 +75,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
7875
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
7976
"TableSink schema: " + String.join(",", fieldNames));
8077
}
81-
82-
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
78+
StreamQueryConfig config = null == queryConfig ? tableEnv.queryConfig() : queryConfig;
79+
tableEnv.insertInto(newTable, targetTableName, config);
8380
}
8481
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import com.google.common.collect.Lists;
5555
import com.google.common.collect.Maps;
5656
import org.apache.flink.streaming.api.datastream.DataStream;
57+
import org.apache.flink.table.api.StreamQueryConfig;
5758
import org.apache.flink.table.api.Table;
5859
import org.apache.flink.table.api.java.StreamTableEnvironment;
5960
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -88,7 +89,7 @@ public class SideSqlExec {
8889
private Map<String, Table> localTableCache = Maps.newHashMap();
8990

9091
public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
91-
Map<String, Table> tableCache)
92+
Map<String, Table> tableCache, StreamQueryConfig queryConfig)
9293
throws Exception {
9394

9495
if(localSqlPluginPath == null){
@@ -121,7 +122,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
121122
if(pollSqlNode.getKind() == INSERT){
122123
System.out.println("----------real exec sql-----------" );
123124
System.out.println(pollSqlNode.toString());
124-
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
125+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString(), queryConfig);
125126
if(LOG.isInfoEnabled()){
126127
LOG.info("exec sql: " + pollSqlNode.toString());
127128
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ public static int getEnvParallelism(Properties properties){
278278
* @param tableEnv
279279
* @return
280280
*/
281-
public static void setTableEnvTTL(Properties properties, StreamTableEnvironment tableEnv) {
281+
public static StreamQueryConfig getTableEnvTTL(Properties properties, StreamTableEnvironment tableEnv) {
282+
StreamQueryConfig qConfig = null;
282283
String ttlMintimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MINTIME);
283284
String ttlMaxtimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MAXTIME);
284285
if (StringUtils.isNotEmpty(ttlMintimeStr) || StringUtils.isNotEmpty(ttlMaxtimeStr)) {
@@ -295,10 +296,11 @@ public static void setTableEnvTTL(Properties properties, StreamTableEnvironment
295296
ttlMaxtime = getTtlTime(Integer.parseInt(ttlMaxtimeStrMatcher.group(1)), ttlMaxtimeStrMatcher.group(2));
296297
}
297298
if (0L != ttlMintime && 0L != ttlMaxtime) {
298-
StreamQueryConfig qConfig = tableEnv.queryConfig();
299+
qConfig = tableEnv.queryConfig();
299300
qConfig.withIdleStateRetentionTime(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime));
300301
}
301302
}
303+
return qConfig;
302304
}
303305

304306
/**

db2/db2-side/db2-all-side/pom.xml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.db2</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.side.all.db2</artifactId>
14+
15+
<name>db2-all-side</name>
16+
17+
<packaging>jar</packaging>
18+
19+
<properties>
20+
<sql.side.db2.core.version>1.0-SNAPSHOT</sql.side.db2.core.version>
21+
</properties>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>com.dtstack.flink</groupId>
26+
<artifactId>sql.side.db2.core</artifactId>
27+
<version>${sql.side.db2.core.version}</version>
28+
</dependency>
29+
</dependencies>
30+
31+
<build>
32+
<plugins>
33+
<plugin>
34+
<groupId>org.apache.maven.plugins</groupId>
35+
<artifactId>maven-shade-plugin</artifactId>
36+
<version>1.4</version>
37+
<executions>
38+
<execution>
39+
<phase>package</phase>
40+
<goals>
41+
<goal>shade</goal>
42+
</goals>
43+
<configuration>
44+
<artifactSet>
45+
<excludes>
46+
47+
</excludes>
48+
</artifactSet>
49+
<filters>
50+
<filter>
51+
<artifact>*:*</artifact>
52+
<excludes>
53+
<exclude>META-INF/*.SF</exclude>
54+
<exclude>META-INF/*.DSA</exclude>
55+
<exclude>META-INF/*.RSA</exclude>
56+
</excludes>
57+
</filter>
58+
</filters>
59+
</configuration>
60+
</execution>
61+
</executions>
62+
</plugin>
63+
64+
<plugin>
65+
<artifactId>maven-antrun-plugin</artifactId>
66+
<version>1.2</version>
67+
<executions>
68+
<execution>
69+
<id>copy-resources</id>
70+
<!-- here the phase you need -->
71+
<phase>package</phase>
72+
<goals>
73+
<goal>run</goal>
74+
</goals>
75+
<configuration>
76+
<tasks>
77+
<copy todir="${basedir}/../../../plugins/db2allside">
78+
<fileset dir="target/">
79+
<include name="${project.artifactId}-${project.version}.jar"/>
80+
</fileset>
81+
</copy>
82+
83+
<move file="${basedir}/../../../plugins/db2allside/${project.artifactId}-${project.version}.jar"
84+
tofile="${basedir}/../../../plugins/db2allside/${project.name}-${git.branch}.jar"/>
85+
</tasks>
86+
</configuration>
87+
</execution>
88+
</executions>
89+
</plugin>
90+
</plugins>
91+
</build>
92+
93+
94+
</project>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.db2;
20+
21+
import com.dtstack.flink.sql.side.FieldInfo;
22+
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import com.google.common.collect.Maps;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
/**
37+
* Date: 2019/11/20
38+
* Company: www.dtstack.com
39+
*
40+
* @author xiuzhu
41+
*/
42+
43+
public class Db2AllReqRow extends RdbAllReqRow {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(Db2AllReqRow.class);
46+
47+
private static final String DB2_DRIVER = "com.ibm.db2.jcc.DB2Driver";
48+
49+
public Db2AllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
50+
super(new Db2AllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
51+
}
52+
53+
@Override
54+
public Connection getConn(String dbURL, String userName, String password) {
55+
try {
56+
Class.forName(DB2_DRIVER);
57+
Connection conn = DriverManager.getConnection(dbURL, userName, password);
58+
return conn;
59+
} catch (Exception e) {
60+
LOG.error("", e);
61+
throw new RuntimeException("", e);
62+
}
63+
}
64+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.db2;
20+
21+
import com.dtstack.flink.sql.side.FieldInfo;
22+
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
27+
import java.util.List;
28+
29+
/**
30+
* Reason:
31+
* Date: 2019/11/20
32+
* Company: www.dtstack.com
33+
*
34+
* @author xiuzhu
35+
*/
36+
37+
public class Db2AllSideInfo extends RdbAllSideInfo {
38+
public Db2AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
39+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
40+
}
41+
}

0 commit comments

Comments
 (0)