|
80 | 80 | import java.util.Set; |
81 | 81 |
|
82 | 82 | /** |
83 | | - * 任务执行时的流程方法 |
| 83 | + * 任务执行时的流程方法 |
84 | 84 | * Date: 2020/2/17 |
85 | 85 | * Company: www.dtstack.com |
| 86 | + * |
86 | 87 | * @author maqi |
87 | 88 | */ |
88 | 89 | public class ExecuteProcessHelper { |
@@ -131,7 +132,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception { |
131 | 132 | } |
132 | 133 |
|
133 | 134 | /** |
134 | | - * 非local模式或者shipfile部署模式,remoteSqlPluginPath必填 |
| 135 | + * 非local模式或者shipfile部署模式,remoteSqlPluginPath必填 |
| 136 | + * |
135 | 137 | * @param remoteSqlPluginPath |
136 | 138 | * @param deployMode |
137 | 139 | * @param pluginLoadMode |
@@ -189,12 +191,12 @@ private static List<URL> getExternalJarUrls(String addJarListStr) throws java.io |
189 | 191 | } |
190 | 192 |
|
191 | 193 | private static void sqlTranslation(String localSqlPluginPath, |
192 | | - StreamTableEnvironment tableEnv, |
193 | | - SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap, |
194 | | - Map<String, Table> registerTableCache, |
195 | | - StreamQueryConfig queryConfig) throws Exception { |
| 194 | + StreamTableEnvironment tableEnv, |
| 195 | + SqlTree sqlTree, Map<String, SideTableInfo> sideTableMap, |
| 196 | + Map<String, Table> registerTableCache, |
| 197 | + StreamQueryConfig queryConfig) throws Exception { |
196 | 198 |
|
197 | | - SideSqlExec sideSqlExec = new SideSqlExec(); |
| 199 | + SideSqlExec sideSqlExec = new SideSqlExec(); |
198 | 200 | sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath); |
199 | 201 | for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) { |
200 | 202 | sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result); |
@@ -254,13 +256,14 @@ private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUr |
254 | 256 | } |
255 | 257 |
|
256 | 258 | /** |
257 | | - * 向Flink注册源表和结果表,返回执行时插件包的全路径 |
| 259 | + * 向Flink注册源表和结果表,返回执行时插件包的全路径 |
| 260 | + * |
258 | 261 | * @param sqlTree |
259 | 262 | * @param env |
260 | 263 | * @param tableEnv |
261 | 264 | * @param localSqlPluginPath |
262 | 265 | * @param remoteSqlPluginPath |
263 | | - * @param pluginLoadMode 插件加载模式 classpath or shipfile |
| 266 | + * @param pluginLoadMode 插件加载模式 classpath or shipfile |
264 | 267 | * @param sideTableMap |
265 | 268 | * @param registerTableCache |
266 | 269 | * @return |
@@ -293,7 +296,23 @@ private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironmen |
293 | 296 |
|
294 | 297 | if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) { |
295 | 298 | adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo); |
296 | | - fields += ",ROWTIME.ROWTIME"; |
| 299 | + String eventTimeField = sourceTableInfo.getEventTimeField(); |
| 300 | + boolean hasEventTimeField = false; |
| 301 | + if (!Strings.isNullOrEmpty(eventTimeField)) { |
| 302 | + String[] fieldArray = fields.split(","); |
| 303 | + for (int i = 0; i < fieldArray.length; i++) { |
| 304 | + if (fieldArray[i].equals(eventTimeField)) { |
| 305 | + fieldArray[i] = eventTimeField + ".ROWTIME"; |
| 306 | + hasEventTimeField = true; |
| 307 | + break; |
| 308 | + } |
| 309 | + } |
| 310 | + if (hasEventTimeField) { |
| 311 | + fields = String.join(",", fieldArray); |
| 312 | + } else { |
| 313 | + fields += ",ROWTIME.ROWTIME"; |
| 314 | + } |
| 315 | + } |
297 | 316 | } else { |
298 | 317 | fields += ",PROCTIME.PROCTIME"; |
299 | 318 | } |
@@ -329,7 +348,8 @@ private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironmen |
329 | 348 | } |
330 | 349 |
|
331 | 350 | /** |
332 | | - * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph |
| 351 | + * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph |
| 352 | + * |
333 | 353 | * @param env |
334 | 354 | * @param classPathSet |
335 | 355 | */ |
|
0 commit comments