Skip to content

Commit d878bd3

Browse files
committed
Merge branch 'feat_1.10_4.0.x_32416' into '1.10_release_4.0.x'
Feat 1.10 4.0.x 32416 See merge request dt-insight-engine/flinkStreamSQL!192
2 parents 38079c1 + ed3be58 commit d878bd3

File tree

4 files changed

+21
-1
lines changed

4 files changed

+21
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public class ConfigConstrant {
5656
// default 200ms
5757
public static final String AUTO_WATERMARK_INTERVAL_KEY = "autoWatermarkInterval";
5858

59+
// window early trigger
60+
public static final String EARLY_TRIGGER = "early.trigger";
61+
5962
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
6063
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
6164

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
121121
}
122122
}
123123

124+
/**
125+
* 设置TableEnvironment window提前触发
126+
* @param tableEnv
127+
* @param confProperties
128+
*/
129+
public static void streamTableEnvironmentEarlyTriggerConfig(TableEnvironment tableEnv, Properties confProperties) {
130+
confProperties = PropertiesUtils.propertiesTrim(confProperties);
131+
String triggerTime = confProperties.getProperty(ConfigConstrant.EARLY_TRIGGER);
132+
if (StringUtils.isNumeric(triggerTime)) {
133+
TableConfig qConfig = tableEnv.getConfig();
134+
qConfig.getConfiguration().setString("table.exec.emit.early-fire.enabled", "true");
135+
qConfig.getConfiguration().setString("table.exec.emit.early-fire.delay", triggerTime+"s");
136+
}
137+
}
138+
124139
/**
125140
* 设置TableEnvironment状态超时时间
126141
* @param tableEnv

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ public static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironmen
379379

380380
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, tableConfig);
381381
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
382+
StreamEnvConfigManager.streamTableEnvironmentEarlyTriggerConfig(tableEnv, confProperties);
382383
return tableEnv;
383384
}
384385

localTest/src/main/java/com/dtstack/flink/sql/localTest/LocalTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ public static void main(String[] args) throws Exception {
5050
setLogLevel("INFO");
5151

5252
List<String> propertiesList = new ArrayList<>();
53-
String sqlPath = "/Users/wtz/dtstack/job/flinkStreamSQL/sql/TestDemo/JoinDemoTwo.sql";
53+
String sqlPath = "/Users/chuixue/Desktop/tmp/sqlFile.sql";
5454
Map<String, Object> conf = new HashMap<>();
5555
JSONObject properties = new JSONObject();
5656

5757
//其他参数配置
5858
properties.put("time.characteristic", "eventTime");
5959
properties.put("timezone", TimeZone.getDefault());
60+
properties.put("early.trigger", "1");
6061

6162
// 任务配置参数
6263
conf.put("-sql", URLEncoder.encode(readSQL(sqlPath), StandardCharsets.UTF_8.name()));

0 commit comments

Comments
 (0)