Skip to content

Commit d34c1e8

Browse files
committed
Merge branch '1.8.0_dev_rocksdb' into 'v1.8.0_dev'
为每个任务指定状态后端 See merge request !134
2 parents 8bdfb15 + 6a18b98 commit d34c1e8

File tree

5 files changed

+92
-8
lines changed

5 files changed

+92
-8
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
112112
* 可选参数:
113113
* sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
114114
* sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
115+
* state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。
116+
* state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。
117+
* state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。
115118
* sql.env.parallelism: 默认并行度设置
116119
* sql.max.env.parallelism: 最大并行度设置
117120
* time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime]

core/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@
108108
<artifactId>flink-yarn_2.11</artifactId>
109109
<version>${flink.version}</version>
110110
</dependency>
111+
112+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
113+
<dependency>
114+
<groupId>org.apache.flink</groupId>
115+
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
116+
<version>${flink.version}</version>
117+
</dependency>
118+
111119
</dependencies>
112120

113121
<build>

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ public class ConfigConstrant {
4343

4444
public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "flink.checkpoint.cleanup.mode";
4545

46-
47-
48-
public static final String FLINK_CHECKPOINT_DATAURI_KEY = "flinkCheckpointDataURI";
49-
5046
public static final String SQL_ENV_PARALLELISM = "sql.env.parallelism";
5147

5248
public static final String SQL_MAX_ENV_PARALLELISM = "sql.max.env.parallelism";
@@ -61,6 +57,10 @@ public class ConfigConstrant {
6157

6258
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
6359

60+
public static final String STATE_BACKEND_KEY = "state.backend";
61+
public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir";
62+
public static final String STATE_BACKEND_INCREMENTAL_KEY = "state.backend.incremental";
63+
6464

6565
// restart plocy
6666
public static final int failureRate = 3;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
* Flink状态后端类型
23+
* Date: 2019/11/15
24+
* Company: www.dtstack.com
25+
* @author maqi
26+
*/
27+
public enum EStateBackend {
28+
MEMORY,
29+
ROCKSDB,
30+
FILESYSTEM;
31+
32+
public static EStateBackend convertFromString(String type) {
33+
if(type == null) {
34+
throw new RuntimeException("null StateBackend!");
35+
}
36+
return valueOf(type.toUpperCase());
37+
}
38+
}
39+
40+

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222

2323

2424
import com.dtstack.flink.sql.constrant.ConfigConstrant;
25+
import com.dtstack.flink.sql.enums.EStateBackend;
26+
import org.apache.commons.lang3.BooleanUtils;
2527
import org.apache.commons.lang3.StringUtils;
2628
import org.apache.flink.api.common.time.Time;
2729
import org.apache.flink.api.common.typeinfo.TypeInformation;
30+
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
31+
import org.apache.flink.runtime.state.StateBackend;
2832
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
33+
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
2934
import org.apache.flink.streaming.api.CheckpointingMode;
3035
import org.apache.flink.streaming.api.TimeCharacteristic;
3136
import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -51,6 +56,7 @@
5156
import java.util.regex.Matcher;
5257
import java.util.regex.Pattern;
5358

59+
5460
/**
5561
* Reason:
5662
* Date: 2017/2/21
@@ -130,10 +136,13 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
130136
throw new RuntimeException("not support value of cleanup mode :" + cleanupModeStr);
131137
}
132138

133-
String backendPath = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_DATAURI_KEY);
134-
if(backendPath != null){
135-
//set checkpoint save path on file system, 根据实际的需求设定文件路径,hdfs://, file://
136-
env.setStateBackend(new FsStateBackend(backendPath));
139+
String backendType = properties.getProperty(ConfigConstrant.STATE_BACKEND_KEY);
140+
String checkpointDataUri = properties.getProperty(ConfigConstrant.CHECKPOINTS_DIRECTORY_KEY);
141+
String backendIncremental = properties.getProperty(ConfigConstrant.STATE_BACKEND_INCREMENTAL_KEY, "true");
142+
143+
if(!StringUtils.isEmpty(backendType)){
144+
StateBackend stateBackend = createStateBackend(backendType, checkpointDataUri, backendIncremental);
145+
env.setStateBackend(stateBackend);
137146
}
138147

139148
}
@@ -377,4 +386,28 @@ public static TypeInformation[] transformTypes(Class[] fieldTypes){
377386
return types;
378387
}
379388

389+
private static StateBackend createStateBackend(String backendType, String checkpointDataUri, String backendIncremental) throws IOException {
390+
EStateBackend stateBackendType = EStateBackend.convertFromString(backendType);
391+
StateBackend stateBackend = null;
392+
switch (stateBackendType) {
393+
case MEMORY:
394+
stateBackend = new MemoryStateBackend();
395+
break;
396+
case FILESYSTEM:
397+
checkpointDataUriEmptyCheck(checkpointDataUri, backendType);
398+
stateBackend = new FsStateBackend(checkpointDataUri);
399+
break;
400+
case ROCKSDB:
401+
checkpointDataUriEmptyCheck(checkpointDataUri, backendType);
402+
stateBackend = new RocksDBStateBackend(checkpointDataUri, BooleanUtils.toBoolean(backendIncremental));
403+
break;
404+
}
405+
return stateBackend;
406+
}
407+
408+
private static void checkpointDataUriEmptyCheck(String checkpointDataUri, String backendType) {
409+
if (StringUtils.isEmpty(checkpointDataUri)) {
410+
throw new RuntimeException(backendType + " backend checkpointDataUri not null!");
411+
}
412+
}
380413
}

0 commit comments

Comments
 (0)