Skip to content

Commit 11749b3

Browse files
author
gituser
committed
Merge branch 'v1.8.0_dev' into 1.8_v3.9.0_beta_1.0
2 parents 78b3d42 + a0a8e7a commit 11749b3

File tree

185 files changed

+7389
-449
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

185 files changed

+7389
-449
lines changed

README.md

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,33 @@
88
> > * 扩展了输入和输出的性能指标到promethus
99
1010
## 新特性:
11-
1.kafka源表支持not null语法,支持字符串类型的时间转换。
12-
2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
13-
3.异步维表支持非等值连接,比如:<>,<,>。
11+
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
12+
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
13+
* 3.异步维表支持非等值连接,比如:<>,<,>。
14+
* 4.增加kafka数组解析
15+
* 5.增加kafka1.0以上版本的支持
16+
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
17+
* 7.支持插件的依赖方式,参考pluginLoadMode参数
18+
* 8.支持cep处理
19+
* 9.支持udaf
20+
* 10.支持谓词下移
21+
* 11.支持状态的ttl
1422

1523
## BUG修复:
16-
1.修复不能解析sql中orderby,union语法。
17-
2.修复yarnPer模式提交失败的异常。
24+
* 1.修复不能解析sql中orderby,union语法。
25+
* 2.修复yarnPer模式提交失败的异常。
26+
* 3.一些bug的修复
1827

1928
# 已支持
20-
* 源表:kafka 0.91.x版本
21-
* 维表:mysqlSQlServer,oracle,hbasemongoredis,cassandra,serversocket
22-
* 结果表:mysqlSQlServer,oracle,hbaseelasticsearch5.xmongoredis,cassandra,console
29+
* 源表:kafka 0.9、0.10、0.11、1.x版本
30+
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse
31+
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse
2332

2433
# 后续开发计划
25-
* 增加SQL支持CEP
2634
* 维表快照
27-
* sql优化(谓词下移等)
2835
* kafka avro格式
2936
* topN
3037

31-
3238
## 1 快速起步
3339
### 1.1 运行模式
3440

@@ -40,7 +46,7 @@
4046
### 1.2 执行环境
4147

4248
* Java: JDK8及以上
43-
* Flink集群: 1.4,1.5(单机模式不需要安装Flink集群)
49+
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
4450
* 操作系统:理论上不限
4551

4652
### 1.3 打包
@@ -104,6 +110,11 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
104110
* 必选:是 (如无参数填写空json即可)
105111
* 默认值:无
106112
* 可选参数:
113+
* sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
114+
* sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
115+
* state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。
116+
* state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。
117+
* state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。
107118
* sql.env.parallelism: 默认并行度设置
108119
* sql.max.env.parallelism: 最大并行度设置
109120
* time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime]
@@ -150,6 +161,11 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
150161
* 必选:否
151162
* 默认值:false
152163

164+
* **pluginLoadMode**
165+
* 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件
166+
* 必选:否
167+
* 默认值:classpath
168+
153169
* **yarnSessionConf**
154170
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
155171
* 必选:否
@@ -163,16 +179,24 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
163179
* [elasticsearch 结果表插件](docs/elasticsearchSink.md)
164180
* [hbase 结果表插件](docs/hbaseSink.md)
165181
* [mysql 结果表插件](docs/mysqlSink.md)
182+
* [oracle 结果表插件](docs/oracleSink.md)
166183
* [mongo 结果表插件](docs/mongoSink.md)
167184
* [redis 结果表插件](docs/redisSink.md)
168185
* [cassandra 结果表插件](docs/cassandraSink.md)
186+
* [kudu 结果表插件](docs/kuduSink.md)
187+
* [postgresql 结果表插件](docs/postgresqlSink.md)
188+
* [clickhouse 结果表插件](docs/clickhouseSink.md)
169189

170190
### 2.3 维表插件
171191
* [hbase 维表插件](docs/hbaseSide.md)
172192
* [mysql 维表插件](docs/mysqlSide.md)
193+
* [oracle 维表插件](docs/oracleSide.md)
173194
* [mongo 维表插件](docs/mongoSide.md)
174195
* [redis 维表插件](docs/redisSide.md)
175196
* [cassandra 维表插件](docs/cassandraSide.md)
197+
* [kudu 维表插件](docs/kuduSide.md)
198+
* [postgresql 维表插件](docs/postgresqlSide.md)
199+
* [clickhouse 维表插件](docs/clickhouseSide.md)
176200

177201
## 3 性能指标(新增)
178202

@@ -203,7 +227,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
203227

204228
```
205229
206-
CREATE (scala|table) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun
230+
CREATE (scala|table|aggregate) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun;
207231
208232
209233
CREATE TABLE MyTable(

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import org.apache.calcite.sql.JoinType;
3737
import org.apache.commons.collections.CollectionUtils;
3838
import org.apache.flink.api.java.typeutils.RowTypeInfo;
39-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
40-
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
39+
import com.google.common.collect.Lists;
40+
import com.google.common.collect.Maps;
4141
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4242
import org.apache.flink.types.Row;
4343
import org.apache.flink.util.Collector;
@@ -134,7 +134,11 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
134134
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135135
Object equalObj = value.getField(conValIndex);
136136
if (equalObj == null) {
137-
out.collect(null);
137+
if(sideInfo.getJoinType() == JoinType.LEFT){
138+
Row data = fillData(value, null);
139+
out.collect(data);
140+
}
141+
return;
138142
}
139143

140144
inputParams.add(equalObj);

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
27-
import org.apache.calcite.sql.SqlBasicCall;
28-
import org.apache.calcite.sql.SqlKind;
2927
import org.apache.calcite.sql.SqlNode;
3028
import org.apache.commons.collections.CollectionUtils;
3129
import org.apache.flink.api.java.typeutils.RowTypeInfo;
32-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
30+
import com.google.common.collect.Lists;
3331

3432
import java.util.List;
3533

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import io.vertx.core.json.JsonArray;
4646
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4747
import org.apache.flink.configuration.Configuration;
48-
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
48+
import com.google.common.collect.Lists;
4949
import org.apache.flink.streaming.api.functions.async.ResultFuture;
5050
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5151
import org.apache.flink.types.Row;
@@ -170,7 +170,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171171
Object equalObj = input.getField(conValIndex);
172172
if (equalObj == null) {
173-
resultFuture.complete(null);
173+
dealMissKey(input, resultFuture);
174+
return;
174175
}
175176
inputParams.add(equalObj);
176177
stringBuffer.append(sideInfo.getEqualFieldList().get(i))

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.calcite.sql.SqlKind;
3030
import org.apache.calcite.sql.SqlNode;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
32-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
32+
import com.google.common.collect.Lists;
3333

3434
import java.util.List;
3535

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package com.dtstack.flink.sql.side.cassandra.table;
2121

2222
import com.dtstack.flink.sql.side.SideTableInfo;
23-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
23+
import com.google.common.base.Preconditions;
2424

2525
/**
2626
* Reason:

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package com.dtstack.flink.sql.sink.cassandra.table;
2121

2222
import com.dtstack.flink.sql.table.TargetTableInfo;
23-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
23+
import com.google.common.base.Preconditions;
2424

2525
/**
2626
* Reason:
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.clickhouse</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.all.clickhouse</artifactId>
13+
<name>clickhouse-all-side</name>
14+
15+
<packaging>jar</packaging>
16+
17+
<properties>
18+
<sql.side.clickhouse.core.version>1.0-SNAPSHOT</sql.side.clickhouse.core.version>
19+
</properties>
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>com.dtstack.flink</groupId>
24+
<artifactId>sql.side.clickhouse.core</artifactId>
25+
<version>${sql.side.clickhouse.core.version}</version>
26+
</dependency>
27+
</dependencies>
28+
29+
<build>
30+
<plugins>
31+
<plugin>
32+
<groupId>org.apache.maven.plugins</groupId>
33+
<artifactId>maven-shade-plugin</artifactId>
34+
<version>1.4</version>
35+
<executions>
36+
<execution>
37+
<phase>package</phase>
38+
<goals>
39+
<goal>shade</goal>
40+
</goals>
41+
<configuration>
42+
<artifactSet>
43+
<excludes>
44+
45+
</excludes>
46+
</artifactSet>
47+
<filters>
48+
<filter>
49+
<artifact>*:*</artifact>
50+
<excludes>
51+
<exclude>META-INF/*.SF</exclude>
52+
<exclude>META-INF/*.DSA</exclude>
53+
<exclude>META-INF/*.RSA</exclude>
54+
</excludes>
55+
</filter>
56+
</filters>
57+
</configuration>
58+
</execution>
59+
</executions>
60+
</plugin>
61+
62+
<plugin>
63+
<artifactId>maven-antrun-plugin</artifactId>
64+
<version>1.2</version>
65+
<executions>
66+
<execution>
67+
<id>copy-resources</id>
68+
<!-- here the phase you need -->
69+
<phase>package</phase>
70+
<goals>
71+
<goal>run</goal>
72+
</goals>
73+
<configuration>
74+
<tasks>
75+
<copy todir="${basedir}/../../../plugins/clickhouseallside">
76+
<fileset dir="target/">
77+
<include name="${project.artifactId}-${project.version}.jar"/>
78+
</fileset>
79+
</copy>
80+
81+
<move file="${basedir}/../../../plugins/clickhouseallside/${project.artifactId}-${project.version}.jar"
82+
tofile="${basedir}/../../../plugins/clickhouseallside/${project.name}-${git.branch}.jar"/>
83+
</tasks>
84+
</configuration>
85+
</execution>
86+
</executions>
87+
</plugin>
88+
</plugins>
89+
</build>
90+
91+
92+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.clickhouse;
20+
21+
import com.dtstack.flink.sql.side.FieldInfo;
22+
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import com.dtstack.flink.sql.util.JDBCUtils;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.sql.Connection;
33+
import java.sql.DriverManager;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
public class ClickhouseAllReqRow extends RdbAllReqRow {
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseAllReqRow.class);
40+
41+
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
42+
43+
public ClickhouseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
44+
super(new ClickhouseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
45+
}
46+
47+
@Override
48+
public Connection getConn(String dbURL, String userName, String passWord) {
49+
try {
50+
Connection connection ;
51+
JDBCUtils.forName(CLICKHOUSE_DRIVER, getClass().getClassLoader());
52+
// ClickHouseProperties contains all properties
53+
if (userName == null) {
54+
connection = DriverManager.getConnection(dbURL);
55+
} else {
56+
connection = DriverManager.getConnection(dbURL, userName, passWord);
57+
}
58+
return connection;
59+
} catch (Exception e) {
60+
LOG.error("", e);
61+
throw new RuntimeException("", e);
62+
}
63+
}
64+
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.clickhouse;
20+
21+
import com.dtstack.flink.sql.side.FieldInfo;
22+
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
27+
import java.util.List;
28+
29+
30+
public class ClickhouseAllSideInfo extends RdbAllSideInfo {
31+
public ClickhouseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
32+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
33+
}
34+
}

0 commit comments

Comments
 (0)