Skip to content

Commit 7b0316c

Browse files
Merge branch 'v1.8.0_dev' of ssh://git.dtstack.cn:10022/dt-insight-engine/flinkStreamSQL into hotfix_1.8_3.10.2_21725_megreDev
2 parents 55b22b4 + 00a032a commit 7b0316c

File tree

31 files changed

+3183
-9
lines changed

31 files changed

+3183
-9
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152+
* restore.enable:是否失败重启(默认是true)
153+
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154+
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
152155
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
153156

154157

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ public class ConfigConstrant {
6363
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
6464
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
6565

66+
public static final String RESTOREENABLE = "restore.enable";
67+
6668

6769
// restart plocy
6870
public static final int failureRate = 3;
6971

70-
public static final int failureInterval = 6; //min
72+
public static final String FAILUREINTERVAL = "failure.interval"; //min
7173

72-
public static final int delayInterval = 10; //sec
74+
public static final String DELAYINTERVAL= "delay.interval"; //sec
7375

7476
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
101101
}
102102
});
103103

104-
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
105-
ConfigConstrant.failureRate,
106-
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
107-
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
108-
));
104+
if(isRestore(confProperties).get()){
105+
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
106+
ConfigConstrant.failureRate,
107+
Time.of(getFailureInterval(confProperties).get(), TimeUnit.MINUTES),
108+
Time.of(getDelayInterval(confProperties).get(), TimeUnit.SECONDS)
109+
));
110+
} else {
111+
streamEnv.setRestartStrategy(RestartStrategies.noRestart());
112+
}
109113

110114
// checkpoint config
111115
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
@@ -163,6 +167,20 @@ public static Optional<Long> getAutoWatermarkInterval(Properties properties) {
163167
return StringUtils.isNotBlank(autoWatermarkInterval) ? Optional.of(Long.valueOf(autoWatermarkInterval)) : Optional.empty();
164168
}
165169

170+
public static Optional<Boolean> isRestore(Properties properties){
171+
String restoreEnable = properties.getProperty(ConfigConstrant.RESTOREENABLE, "true");
172+
return Optional.of(Boolean.valueOf(restoreEnable));
173+
}
174+
175+
public static Optional<Integer> getDelayInterval(Properties properties){
176+
String delayInterval = properties.getProperty(ConfigConstrant.DELAYINTERVAL, "10");
177+
return Optional.of(Integer.valueOf(delayInterval));
178+
}
179+
public static Optional<Integer> getFailureInterval(Properties properties){
180+
String failureInterval = properties.getProperty(ConfigConstrant.FAILUREINTERVAL, "6");
181+
return Optional.of(Integer.valueOf(failureInterval));
182+
}
183+
166184
/**
167185
* #ProcessingTime(默认), IngestionTime, EventTime
168186
* @param properties

core/src/main/java/com/dtstack/flink/sql/exec/ApiResult.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

7-
import java.util.UUID;
8-
97
/**
108
* API调用结果返回
119
* Date: 2020/2/24

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
24+
import org.apache.calcite.sql.JoinType;
2425
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.table.runtime.types.CRow;
28+
import org.apache.flink.types.Row;
29+
import org.apache.flink.util.Collector;
2730

2831
import java.sql.SQLException;
2932
import java.util.concurrent.Executors;
@@ -64,4 +67,13 @@ public void open(Configuration parameters) throws Exception {
6467
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6568
}
6669

70+
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out){
71+
if(sideInput == null && sideInfo.getJoinType() != JoinType.LEFT){
72+
return;
73+
}
74+
75+
Row row = fillData(value.row(), sideInput);
76+
out.collect(new CRow(row, value.change()));
77+
}
78+
6779
}

docs/elasticsearch6Side

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
2+
## 1.格式:
3+
```
4+
CREATE TABLE tableName(
5+
colName cloType,
6+
...
7+
PRIMARY KEY(keyInfo),
8+
PERIOD FOR SYSTEM_TIME
9+
)WITH(
10+
type='elasticsearch6',
11+
address ='ip:port[,ip:port]',
12+
cluster='clusterName',
13+
estype ='esType',
14+
index ='index',
15+
authMesh='true',
16+
userName='dbUserName',
17+
password='dbPwd',
18+
cache ='LRU',
19+
cacheSize ='10000',
20+
cacheTTLMs ='60000',
21+
parallelism ='1',
22+
partitionedJoin='false'
23+
);
24+
```
25+
26+
# 2.支持版本
27+
mysql-5.6.35
28+
29+
## 3.表结构定义
30+
31+
|参数名称|含义|
32+
|----|---|
33+
| tableName | mysql表名称|
34+
| colName | 列名称|
35+
| colType | 列类型 [colType支持的类型](colType.md)|
36+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
37+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
38+
39+
## 4.参数
40+
41+
|参数名称|含义|是否必填|默认值|
42+
|----|---|---|----|
43+
type|表明 输出表类型[elasticsearch6]|是||
44+
|address | 连接ES Transport地址(tcp地址)|是||
45+
|cluster | ES 集群名称 |是||
46+
|index | 选择的ES上的index名称|否||
47+
|esType | 选择ES上的type名称|否||
48+
|authMesh | 是否进行用户名密码认证 | 否 | false|
49+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
50+
|password | 密码 | 否,authMesh='true'时为必填 ||
51+
| cache | 维表缓存策略(NONE/LRU)|否|NONE|
52+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)|否|false|
53+
|parallelism | 并行度设置|否|1|
54+
55+
----------
56+
> 缓存策略
57+
* NONE: 不做内存缓存
58+
* LRU:
59+
* cacheSize: 缓存的条目数量
60+
* cacheTTLMs:缓存的过期时间(ms)
61+
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
62+
* asyncCapacity:异步请求容量,默认1000
63+
* asyncTimeout:异步请求超时时间,默认10000毫秒
64+
65+
## 5.样例
66+
```
67+
create table sideTable(
68+
channel varchar,
69+
xccount int,
70+
PRIMARY KEY(channel),
71+
PERIOD FOR SYSTEM_TIME
72+
)WITH(
73+
type ='elasticsearch6',
74+
address ='172.16.10.47:9500',
75+
cluster='es_47_menghan',
76+
estype ='type1',
77+
index ='xc_es_test',
78+
authMesh='true',
79+
userName='dtstack',
80+
password='abc123',
81+
cache ='LRU',
82+
cacheSize ='10000',
83+
cacheTTLMs ='60000',
84+
cacheMode='unordered',
85+
asyncCapacity='1000',
86+
asyncTimeout='10000'
87+
parallelism ='1',
88+
partitionedJoin='false'
89+
);
90+
91+
92+
```
93+
94+

docs/elasticsearch6Sink.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
bb INT
6+
)WITH(
7+
type ='elasticsearch6',
8+
address ='ip:port[,ip:port]',
9+
cluster='clusterName',
10+
esType ='esType',
11+
index ='index',
12+
id ='num[,num]',
13+
authMesh = 'true',
14+
userName = 'userName',
15+
password = 'password',
16+
parallelism ='1'
17+
)
18+
```
19+
## 2.支持的版本
20+
elasticsearch 6.8.6
21+
22+
## 3.表结构定义
23+
24+
|参数名称|含义|
25+
|----|---|
26+
|tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称|
27+
|colName|列名称|
28+
|colType|列类型 [colType支持的类型](colType.md)|
29+
30+
## 4.参数:
31+
|参数名称|含义|是否必填|默认值|
32+
|----|---|---|----|
33+
|type|表明 输出表类型[elasticsearch6]|||
34+
|address | 连接ES Transport地址(tcp地址)|||
35+
|cluster | ES 集群名称 |||
36+
|index | 选择的ES上的index名称|||
37+
|esType | 选择ES上的type名称|||
38+
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
39+
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
40+
|authMesh | 是否进行用户名密码认证 || false|
41+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
42+
|password | 密码 | 否,authMesh='true'时为必填 ||
43+
|parallelism | 并行度设置||1|
44+
45+
## 5.样例:
46+
```
47+
CREATE TABLE MyResult(
48+
aa INT,
49+
bb INT
50+
)WITH(
51+
type ='elasticsearch6',
52+
address ='172.16.10.47:9500',
53+
cluster='es_47_menghan',
54+
esType ='type1',
55+
index ='xc_es_test',
56+
authMesh = 'true',
57+
userName = 'elastic',
58+
password = 'abc123',
59+
id ='0,1',
60+
parallelism ='1'
61+
)
62+
```
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.elasticsearch6</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.all.elasticsearch6</artifactId>
13+
<name>elasticsearch6-all-side</name>
14+
<packaging>jar</packaging>
15+
16+
<properties>
17+
<sql.side.elasticsearch6.core.version>1.0-SNAPSHOT</sql.side.elasticsearch6.core.version>
18+
<elasticsearch.version>6.8.6</elasticsearch.version>
19+
</properties>
20+
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.elasticsearch.client</groupId>
25+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
26+
<version>${elasticsearch.version}</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>com.dtstack.flink</groupId>
31+
<artifactId>sql.side.elasticsearch6.core</artifactId>
32+
<version>${sql.side.elasticsearch6.core.version}</version>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>junit</groupId>
37+
<artifactId>junit</artifactId>
38+
<version>4.12</version>
39+
<scope>test</scope>
40+
</dependency>
41+
42+
</dependencies>
43+
44+
<build>
45+
<plugins>
46+
<plugin>
47+
<groupId>org.apache.maven.plugins</groupId>
48+
<artifactId>maven-shade-plugin</artifactId>
49+
<version>1.4</version>
50+
<executions>
51+
<execution>
52+
<phase>package</phase>
53+
<goals>
54+
<goal>shade</goal>
55+
</goals>
56+
<configuration>
57+
<artifactSet>
58+
<excludes>
59+
60+
</excludes>
61+
</artifactSet>
62+
<filters>
63+
<filter>
64+
<artifact>*:*</artifact>
65+
<excludes>
66+
<exclude>META-INF/*.SF</exclude>
67+
<exclude>META-INF/*.DSA</exclude>
68+
<exclude>META-INF/*.RSA</exclude>
69+
</excludes>
70+
</filter>
71+
</filters>
72+
</configuration>
73+
</execution>
74+
</executions>
75+
</plugin>
76+
77+
<plugin>
78+
<artifactId>maven-antrun-plugin</artifactId>
79+
<version>1.2</version>
80+
<executions>
81+
<execution>
82+
<id>copy-resources</id>
83+
<!-- here the phase you need -->
84+
<phase>package</phase>
85+
<goals>
86+
<goal>run</goal>
87+
</goals>
88+
<configuration>
89+
<tasks>
90+
<copy todir="${basedir}/../../../plugins/elasticsearch6allside">
91+
<fileset dir="target/">
92+
<include name="${project.artifactId}-${project.version}.jar"/>
93+
</fileset>
94+
</copy>
95+
96+
<move file="${basedir}/../../../plugins/elasticsearch6allside/${project.artifactId}-${project.version}.jar"
97+
tofile="${basedir}/../../../plugins/elasticsearch6allside/${project.name}-${git.branch}.jar"/>
98+
</tasks>
99+
</configuration>
100+
</execution>
101+
</executions>
102+
</plugin>
103+
</plugins>
104+
</build>
105+
106+
107+
108+
</project>

0 commit comments

Comments
 (0)