Skip to content

Commit 5be6984

Browse files
committed
解决cep不能手动指定watermark的bug
1 parent a8f14b6 commit 5be6984

File tree

1 file changed

+32
-12
lines changed

1 file changed

+32
-12
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@
8080
import java.util.Set;
8181

8282
/**
83-
* 任务执行时的流程方法
83+
* 任务执行时的流程方法
8484
* Date: 2020/2/17
8585
* Company: www.dtstack.com
86+
*
8687
* @author maqi
8788
*/
8889
public class ExecuteProcessHelper {
@@ -131,7 +132,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
131132
}
132133

133134
/**
134-
* 非local模式或者shipfile部署模式,remoteSqlPluginPath必填
135+
* 非local模式或者shipfile部署模式,remoteSqlPluginPath必填
136+
*
135137
* @param remoteSqlPluginPath
136138
* @param deployMode
137139
* @param pluginLoadMode
@@ -187,14 +189,14 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
187189
}
188190
return jarUrlList;
189191
}
190-
192+
191193
private static void sqlTranslation(String localSqlPluginPath,
192-
StreamTableEnvironment tableEnv,
193-
SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,
194-
Map<String, Table> registerTableCache,
195-
StreamQueryConfig queryConfig) throws Exception {
194+
StreamTableEnvironment tableEnv,
195+
SqlTree sqlTree, Map<String, SideTableInfo> sideTableMap,
196+
Map<String, Table> registerTableCache,
197+
StreamQueryConfig queryConfig) throws Exception {
196198

197-
SideSqlExec sideSqlExec = new SideSqlExec();
199+
SideSqlExec sideSqlExec = new SideSqlExec();
198200
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
199201
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
200202
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
@@ -254,13 +256,14 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
254256
}
255257

256258
/**
257-
* 向Flink注册源表和结果表,返回执行时插件包的全路径
259+
* 向Flink注册源表和结果表,返回执行时插件包的全路径
260+
*
258261
* @param sqlTree
259262
* @param env
260263
* @param tableEnv
261264
* @param localSqlPluginPath
262265
* @param remoteSqlPluginPath
263-
* @param pluginLoadMode 插件加载模式 classpath or shipfile
266+
* @param pluginLoadMode 插件加载模式 classpath or shipfile
264267
* @param sideTableMap
265268
* @param registerTableCache
266269
* @return
@@ -293,7 +296,23 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
293296

294297
if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
295298
adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
296-
fields += ",ROWTIME.ROWTIME";
299+
String eventTimeField = sourceTableInfo.getEventTimeField();
300+
boolean hasEventTimeField = false;
301+
if (!Strings.isNullOrEmpty(eventTimeField)) {
302+
String[] fieldArray = fields.split(",");
303+
for (int i = 0; i < fieldArray.length; i++) {
304+
if (fieldArray[i].equals(eventTimeField)) {
305+
fieldArray[i] = eventTimeField + ".ROWTIME";
306+
hasEventTimeField = true;
307+
break;
308+
}
309+
}
310+
if (hasEventTimeField) {
311+
fields = String.join(",", fieldArray);
312+
} else {
313+
fields += ",ROWTIME.ROWTIME";
314+
}
315+
}
297316
} else {
298317
fields += ",PROCTIME.PROCTIME";
299318
}
@@ -329,7 +348,8 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
329348
}
330349

331350
/**
332-
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
351+
* perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
352+
*
333353
* @param env
334354
* @param classPathSet
335355
*/

0 commit comments

Comments
 (0)