File tree Expand file tree Collapse file tree 3 files changed +33
-1
lines changed
core/src/main/java/com/dtstack/flink/sql Expand file tree Collapse file tree 3 files changed +33
-1
lines changed Original file line number Diff line number Diff line change @@ -31,7 +31,6 @@ FlinkStreamSQL
3131
3232## 目录
3333
34- [ 1.0 变更记录] ( docs/changelog.md )
3534[ 1.1 demo] ( docs/demo.md )
3635[ 1.2 快速开始] ( docs/quickStart.md )
3736[ 1.3 参数配置] ( docs/config.md )
Original file line number Diff line number Diff line change @@ -20,4 +20,21 @@ public static String traceOriginalCause(Throwable e) {
2020 }
2121 return errorMsg ;
2222 }
23+
24+ /**
25+ * 根据异常的种类来判断是否需要强制跳过Flink的重启{@link SuppressRestartsException}
26+ * @param e exception
27+ * @param errorMsg 需要抛出的异常信息
28+ */
29+ public static void dealExceptionWithSuppressStart (Exception e , String errorMsg ) {
30+ if (e instanceof SuppressRestartsException ) {
31+ throw new SuppressRestartsException (
32+ new Throwable (
33+ errorMsg
34+ )
35+ );
36+ } else {
37+ throw new RuntimeException (errorMsg );
38+ }
39+ }
2340}
Original file line number Diff line number Diff line change @@ -218,6 +218,22 @@ private void checkSupport(SqlIdentifier identifier) {
218218 Preconditions .checkState (isSide , errorMsg );
219219 }
220220
221+ private void checkSupport (SqlIdentifier identifier ) {
222+ String tableName = identifier .getComponent (0 ).getSimple ();
223+ String sideTableName ;
224+ String sideTableAlias ;
225+ if (joinInfo .isLeftIsSideTable ()) {
226+ sideTableName = joinInfo .getLeftTableName ();
227+ sideTableAlias = joinInfo .getLeftTableAlias ();
228+ } else {
229+ sideTableName = joinInfo .getRightTableName ();
230+ sideTableAlias = joinInfo .getRightTableAlias ();
231+ }
232+ boolean isSide = tableName .equals (sideTableName ) || tableName .equals (sideTableAlias );
233+ String errorMsg = "only support set side table constant field, error field " + identifier ;
234+ Preconditions .checkState (isSide , errorMsg );
235+ }
236+
221237 private void associateField (String sourceTableField , String sideTableField , SqlNode sqlNode ) {
222238 String errorMsg = "can't deal equal field: " + sqlNode ;
223239 equalFieldList .add (sideTableField );
You can’t perform that action at this time.
0 commit comments