Skip to content

Commit 02a5c4a

Browse files
committed
state setIdleStateRetentionTime
1 parent d44b9b9 commit 02a5c4a

File tree

4 files changed

+92
-2
lines changed

4 files changed

+92
-2
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* 8.支持cep处理
1919
* 9.支持udaf
2020
* 10.支持谓词下移
21+
* 11.支持状态的ttl
2122

2223
## BUG修复:
2324
* 1.修复不能解析sql中orderby,union语法。
@@ -109,6 +110,8 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
109110
* 必选:是 (如无参数填写空json即可)
110111
* 默认值:无
111112
* 可选参数:
113+
* sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
114+
* sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
112115
* sql.env.parallelism: 默认并行度设置
113116
* sql.max.env.parallelism: 最大并行度设置
114117
* time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime]

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.flink.streaming.api.datastream.DataStream;
6767
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
6868
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
69+
import org.apache.flink.table.api.StreamQueryConfig;
6970
import org.apache.flink.table.api.Table;
7071
import org.apache.flink.table.api.java.StreamTableEnvironment;
7172
import org.apache.flink.table.sinks.TableSink;
@@ -126,7 +127,7 @@ public static void main(String[] args) throws Exception {
126127
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
127128
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
128129
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
129-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
130+
StreamTableEnvironment tableEnv = getStreamTableEnv(confProperties, env);
130131

131132
List<URL> jarURList = Lists.newArrayList();
132133
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -348,4 +349,17 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
348349
return env;
349350
}
350351

352+
/**
353+
* 获取StreamTableEnvironment并设置相关属性
354+
*
355+
* @param confProperties
356+
* @param env
357+
* @return
358+
*/
359+
private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) {
360+
confProperties = PropertiesUtils.propertiesTrim(confProperties);
361+
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
362+
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
363+
return tableEnv;
364+
}
351365
}

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public class ConfigConstrant {
5757

5858
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";
5959

60+
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
61+
62+
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
63+
64+
6065
// restart plocy
6166
public static final int failureRate = 3;
6267

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121
package com.dtstack.flink.sql.util;
2222

2323

24-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2524
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2625
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.flink.api.common.time.Time;
2727
import org.apache.flink.api.common.typeinfo.TypeInformation;
2828
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
2929
import org.apache.flink.streaming.api.CheckpointingMode;
3030
import org.apache.flink.streaming.api.TimeCharacteristic;
3131
import org.apache.flink.streaming.api.environment.CheckpointConfig;
3232
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33+
import org.apache.flink.table.api.StreamQueryConfig;
3334
import org.apache.flink.table.api.TableEnvironment;
3435
import org.apache.flink.table.api.java.BatchTableEnvironment;
3536
import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -47,6 +48,8 @@
4748
import java.net.URLClassLoader;
4849
import java.util.List;
4950
import java.util.Properties;
51+
import java.util.regex.Matcher;
52+
import java.util.regex.Pattern;
5053

5154
/**
5255
* Reason:
@@ -59,6 +62,9 @@ public class FlinkUtil {
5962

6063
private static final Logger logger = LoggerFactory.getLogger(FlinkUtil.class);
6164

65+
private static final String TTL_PATTERN_STR = "^+?([1-9][0-9]*)([dDhHmMsS])$";
66+
private static final Pattern TTL_PATTERN = Pattern.compile(TTL_PATTERN_STR);
67+
6268
/**
6369
* 开启checkpoint
6470
* @param env
@@ -257,6 +263,68 @@ public static int getEnvParallelism(Properties properties){
257263
return StringUtils.isNotBlank(parallelismStr)?Integer.parseInt(parallelismStr):1;
258264
}
259265

266+
/**
267+
* 设置ttl
268+
* @param properties
269+
* @param tableEnv
270+
* @return
271+
*/
272+
public static void setTableEnvTTL(Properties properties, StreamTableEnvironment tableEnv) {
273+
String ttlMintimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MINTIME);
274+
String ttlMaxtimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MAXTIME);
275+
if (StringUtils.isNotEmpty(ttlMintimeStr) || StringUtils.isNotEmpty(ttlMaxtimeStr)) {
276+
verityTtl(ttlMintimeStr, ttlMaxtimeStr);
277+
Matcher ttlMintimeStrMatcher = TTL_PATTERN.matcher(ttlMintimeStr);
278+
Matcher ttlMaxtimeStrMatcher = TTL_PATTERN.matcher(ttlMaxtimeStr);
279+
280+
Long ttlMintime = 0L;
281+
Long ttlMaxtime = 0L;
282+
if (ttlMintimeStrMatcher.find()) {
283+
ttlMintime = getTtlTime(Integer.parseInt(ttlMintimeStrMatcher.group(1)), ttlMintimeStrMatcher.group(2));
284+
}
285+
if (ttlMaxtimeStrMatcher.find()) {
286+
ttlMaxtime = getTtlTime(Integer.parseInt(ttlMaxtimeStrMatcher.group(1)), ttlMaxtimeStrMatcher.group(2));
287+
}
288+
if (0L != ttlMintime && 0L != ttlMaxtime) {
289+
StreamQueryConfig qConfig = tableEnv.queryConfig();
290+
qConfig.withIdleStateRetentionTime(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime));
291+
}
292+
}
293+
}
294+
295+
/**
296+
* ttl 校验
297+
* @param ttlMintimeStr 最小时间
298+
* @param ttlMaxtimeStr 最大时间
299+
*/
300+
private static void verityTtl(String ttlMintimeStr, String ttlMaxtimeStr) {
301+
if (null == ttlMintimeStr
302+
|| null == ttlMaxtimeStr
303+
|| !TTL_PATTERN.matcher(ttlMintimeStr).find()
304+
|| !TTL_PATTERN.matcher(ttlMaxtimeStr).find()) {
305+
throw new RuntimeException("sql.ttl.min 、sql.ttl.max must be set at the same time . example sql.ttl.min=1h,sql.ttl.max=2h");
306+
}
307+
}
308+
309+
/**
310+
* 不同单位时间到毫秒的转换
311+
* @param timeNumber 时间值,如:30
312+
* @param timeUnit 单位,d:天,h:小时,m:分,s:秒
313+
* @return
314+
*/
315+
private static Long getTtlTime(Integer timeNumber,String timeUnit) {
316+
if (timeUnit.equalsIgnoreCase("d")) {
317+
return timeNumber * 1000l * 60 * 60 * 24;
318+
} else if (timeUnit.equalsIgnoreCase("h")) {
319+
return timeNumber * 1000l * 60 * 60;
320+
} else if (timeUnit.equalsIgnoreCase("m")) {
321+
return timeNumber * 1000l * 60;
322+
} else if (timeUnit.equalsIgnoreCase("s")) {
323+
return timeNumber * 1000l;
324+
} else {
325+
throw new RuntimeException("not support "+timeNumber+timeUnit);
326+
}
327+
}
260328

261329
/**
262330
* 最大并发度

0 commit comments

Comments
 (0)