Skip to content

Commit fe77ffd

Browse files
committed
Merge branch 'v1.8.0_dev' of http://git.dtstack.cn/dtstack/dt-center-flinkStreamSQL into v1.8.0_dev_feature_impala
2 parents 82e449a + 212a891 commit fe77ffd

File tree

56 files changed

+1016
-218
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1016
-218
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* 8.支持cep处理
1919
* 9.支持udaf
2020
* 10.支持谓词下移
21+
* 11.支持状态的ttl
2122

2223
## BUG修复:
2324
* 1.修复不能解析sql中orderby,union语法。
@@ -109,6 +110,11 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
109110
* 必选:是 (如无参数填写空json即可)
110111
* 默认值:无
111112
* 可选参数:
113+
* sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
114+
* sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
115+
* state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。
116+
* state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。
117+
* state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。
112118
* sql.env.parallelism: 默认并行度设置
113119
* sql.max.env.parallelism: 最大并行度设置
114120
* time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime]
@@ -173,6 +179,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
173179
* [elasticsearch 结果表插件](docs/elasticsearchSink.md)
174180
* [hbase 结果表插件](docs/hbaseSink.md)
175181
* [mysql 结果表插件](docs/mysqlSink.md)
182+
* [oracle 结果表插件](docs/oracleSink.md)
176183
* [mongo 结果表插件](docs/mongoSink.md)
177184
* [redis 结果表插件](docs/redisSink.md)
178185
* [cassandra 结果表插件](docs/cassandraSink.md)
@@ -184,6 +191,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
184191
### 2.3 维表插件
185192
* [hbase 维表插件](docs/hbaseSide.md)
186193
* [mysql 维表插件](docs/mysqlSide.md)
194+
* [oracle 维表插件](docs/oracleSide.md)
187195
* [mongo 维表插件](docs/mongoSide.md)
188196
* [redis 维表插件](docs/redisSide.md)
189197
* [cassandra 维表插件](docs/cassandraSide.md)

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
134134
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135135
Object equalObj = value.getField(conValIndex);
136136
if (equalObj == null) {
137-
out.collect(null);
137+
if(sideInfo.getJoinType() == JoinType.LEFT){
138+
Row data = fillData(value, null);
139+
out.collect(data);
140+
}
141+
return;
138142
}
139143

140144
inputParams.add(equalObj);

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171171
Object equalObj = input.getField(conValIndex);
172172
if (equalObj == null) {
173-
resultFuture.complete(null);
173+
dealMissKey(input, resultFuture);
174+
return;
174175
}
175176
inputParams.add(equalObj);
176177
stringBuffer.append(sideInfo.getEqualFieldList().get(i))

core/pom.xml

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1919
<project.package.name>core</project.package.name>
2020
<calcite.server.version>1.16.0</calcite.server.version>
21+
<jackson.version>2.7.9</jackson.version>
22+
<guava.version>19.0</guava.version>
2123
</properties>
2224

2325
<dependencies>
@@ -52,12 +54,6 @@
5254
<version>${flink.version}</version>
5355
</dependency>
5456

55-
<!--<dependency>
56-
<groupId>org.apache.flink</groupId>
57-
<artifactId>flink-table_2.11</artifactId>
58-
<version>1.7.2</version>
59-
</dependency>-->
60-
6157
<dependency>
6258
<groupId>org.apache.flink</groupId>
6359
<artifactId>flink-table-planner_2.11</artifactId>
@@ -75,6 +71,24 @@
7571
<artifactId>calcite-server</artifactId>
7672
<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
7773
<version>${calcite.server.version}</version>
74+
<exclusions>
75+
<exclusion>
76+
<artifactId>jackson-databind</artifactId>
77+
<groupId>com.fasterxml.jackson.core</groupId>
78+
</exclusion>
79+
</exclusions>
80+
</dependency>
81+
82+
<dependency>
83+
<groupId>com.fasterxml.jackson.core</groupId>
84+
<artifactId>jackson-databind</artifactId>
85+
<version>${jackson.version}</version>
86+
</dependency>
87+
88+
<dependency>
89+
<groupId>com.google.guava</groupId>
90+
<artifactId>guava</artifactId>
91+
<version>${guava.version}</version>
7892
</dependency>
7993

8094
<dependency>
@@ -94,6 +108,14 @@
94108
<artifactId>flink-yarn_2.11</artifactId>
95109
<version>${flink.version}</version>
96110
</dependency>
111+
112+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
113+
<dependency>
114+
<groupId>org.apache.flink</groupId>
115+
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
116+
<version>${flink.version}</version>
117+
</dependency>
118+
97119
</dependencies>
98120

99121
<build>
@@ -108,21 +130,40 @@
108130
<plugins>
109131
<plugin>
110132
<groupId>org.apache.maven.plugins</groupId>
111-
<artifactId>maven-jar-plugin</artifactId>
112-
<configuration>
113-
<classesDirectory>target/classes/</classesDirectory>
114-
<archive>
115-
<manifest>
116-
<!-- 主函数的入口 -->
117-
<mainClass>com.dtstack.flink.sql.Main</mainClass>
118-
<!-- 打包时 MANIFEST.MF文件不记录的时间戳版本 -->
119-
<useUniqueVersions>false</useUniqueVersions>
120-
</manifest>
121-
<manifestEntries>
122-
<Class-Path>.</Class-Path>
123-
</manifestEntries>
124-
</archive>
125-
</configuration>
133+
<artifactId>maven-shade-plugin</artifactId>
134+
<version>3.1.0</version>
135+
<executions>
136+
<execution>
137+
<phase>package</phase>
138+
<goals>
139+
<goal>shade</goal>
140+
</goals>
141+
<configuration>
142+
<createDependencyReducedPom>false</createDependencyReducedPom>
143+
<transformers>
144+
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
145+
<mainClass>com.dtstack.flink.sql.Main</mainClass>
146+
</transformer>
147+
</transformers>
148+
<artifactSet>
149+
<includes>
150+
<include>com.fasterxml.jackson.*</include>
151+
<include>com.google.guava</include>
152+
</includes>
153+
</artifactSet>
154+
<filters>
155+
<filter>
156+
<artifact>*:*</artifact>
157+
<excludes>
158+
<exclude>META-INF/*.SF</exclude>
159+
<exclude>META-INF/*.DSA</exclude>
160+
<exclude>META-INF/*.RSA</exclude>
161+
</excludes>
162+
</filter>
163+
</filters>
164+
</configuration>
165+
</execution>
166+
</executions>
126167
</plugin>
127168

128169
<plugin>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.flink.streaming.api.datastream.DataStream;
6767
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
6868
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
69+
import org.apache.flink.table.api.StreamQueryConfig;
6970
import org.apache.flink.table.api.Table;
7071
import org.apache.flink.table.api.java.StreamTableEnvironment;
7172
import org.apache.flink.table.sinks.TableSink;
@@ -127,6 +128,7 @@ public static void main(String[] args) throws Exception {
127128
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
128129
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
129130
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
131+
StreamQueryConfig queryConfig = getStreamTableEnvTTL(confProperties, tableEnv);
130132

131133
List<URL> jarURList = Lists.newArrayList();
132134
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -145,7 +147,7 @@ public static void main(String[] args) throws Exception {
145147
//register table schema
146148
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
147149

148-
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache);
150+
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache, queryConfig);
149151

150152
if(env instanceof MyLocalStreamEnvironment) {
151153
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -154,7 +156,7 @@ public static void main(String[] args) throws Exception {
154156
env.execute(name);
155157
}
156158

157-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache) throws Exception {
159+
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
158160
SideSqlExec sideSqlExec = new SideSqlExec();
159161
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
160162
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
@@ -184,9 +186,9 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
184186
}
185187
if(isSide){
186188
//sql-dimensional table contains the dimension table of execution
187-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
189+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
188190
}else{
189-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
191+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
190192
if(LOG.isInfoEnabled()){
191193
LOG.info("exec sql: " + result.getExecSql());
192194
}
@@ -348,4 +350,15 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
348350
return env;
349351
}
350352

353+
/**
354+
* 获取StreamTableEnvironment并设置相关属性
355+
*
356+
* @param confProperties
357+
* @param tableEnv
358+
* @return
359+
*/
360+
private static StreamQueryConfig getStreamTableEnvTTL(Properties confProperties, StreamTableEnvironment tableEnv) {
361+
confProperties = PropertiesUtils.propertiesTrim(confProperties);
362+
return FlinkUtil.getTableEnvTTL(confProperties, tableEnv);
363+
}
351364
}

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ public class ConfigConstrant {
4343

4444
public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "flink.checkpoint.cleanup.mode";
4545

46-
47-
48-
public static final String FLINK_CHECKPOINT_DATAURI_KEY = "flinkCheckpointDataURI";
49-
5046
public static final String SQL_ENV_PARALLELISM = "sql.env.parallelism";
5147

5248
public static final String SQL_MAX_ENV_PARALLELISM = "sql.max.env.parallelism";
@@ -57,6 +53,15 @@ public class ConfigConstrant {
5753

5854
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";
5955

56+
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
57+
58+
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
59+
60+
public static final String STATE_BACKEND_KEY = "state.backend";
61+
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
62+
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
63+
64+
6065
// restart plocy
6166
public static final int failureRate = 3;
6267

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
* Flink状态后端类型
23+
* Date: 2019/11/15
24+
* Company: www.dtstack.com
25+
* @author maqi
26+
*/
27+
public enum EStateBackend {
28+
MEMORY,
29+
ROCKSDB,
30+
FILESYSTEM;
31+
32+
public static EStateBackend convertFromString(String type) {
33+
if(type == null) {
34+
throw new RuntimeException("null StateBackend!");
35+
}
36+
return valueOf(type.toUpperCase());
37+
}
38+
}
39+
40+

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import org.apache.calcite.sql.SqlIdentifier;
2222
import org.apache.calcite.sql.SqlInsert;
2323
import org.apache.calcite.sql.SqlNode;
24-
import org.apache.flink.table.api.Table;
25-
import org.apache.flink.table.api.TableEnvironment;
26-
import org.apache.flink.table.api.TableException;
27-
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.api.*;
2825
import org.apache.flink.table.api.java.StreamTableEnvironment;
2926
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3027
import org.apache.flink.table.plan.logical.LogicalRelNode;
@@ -41,7 +38,7 @@
4138
*/
4239
public class FlinkSQLExec {
4340

44-
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
41+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, StreamQueryConfig queryConfig) throws Exception {
4542

4643
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
4744
SqlNode insert = planner.parse(stmt);
@@ -78,7 +75,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
7875
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
7976
"TableSink schema: " + String.join(",", fieldNames));
8077
}
81-
82-
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
78+
StreamQueryConfig config = null == queryConfig ? tableEnv.queryConfig() : queryConfig;
79+
tableEnv.insertInto(newTable, targetTableName, config);
8380
}
8481
}

0 commit comments

Comments
 (0)