5353import org .apache .flink .table .api .Table ;
5454import org .apache .flink .table .api .java .StreamTableEnvironment ;
5555import org .apache .flink .table .dataformat .BaseRow ;
56+ import org .apache .flink .table .planner .plan .QueryOperationConverter ;
5657import org .apache .flink .table .runtime .typeutils .BaseRowTypeInfo ;
5758import org .apache .flink .table .runtime .typeutils .BigDecimalTypeInfo ;
5859import org .apache .flink .table .runtime .typeutils .LegacyLocalDateTimeTypeInfo ;
6869import java .time .LocalDateTime ;
6970import java .util .ArrayList ;
7071import java .util .Arrays ;
72+ import java .util .Collection ;
7173import java .util .LinkedList ;
7274import java .util .List ;
7375import java .util .Map ;
@@ -101,6 +103,9 @@ public class SideSqlExec {
101103
102104 private Map <String , Table > localTableCache = Maps .newHashMap ();
103105
106+ //维表重新注册之后的名字缓存
107+ private static Map <String , Table > dimTableNewTable = Maps .newHashMap ();
108+
104109 public void exec (String sql ,
105110 Map <String , AbstractSideTableInfo > sideTableMap ,
106111 StreamTableEnvironment tableEnv ,
@@ -128,6 +133,8 @@ public void exec(String sql,
128133 sideSQLParser .setLocalTableCache (localTableCache );
129134 Queue <Object > exeQueue = sideSQLParser .getExeQueue (sql , sideTableMap .keySet (), scope );
130135 Object pollObj = null ;
136+ // create view中是否包含维表
137+ boolean includeDimTable = false ;
131138
132139 while ((pollObj = exeQueue .poll ()) != null ) {
133140
@@ -136,41 +143,49 @@ public void exec(String sql,
136143
137144
138145 if (pollSqlNode .getKind () == INSERT ) {
139- FlinkSQLExec .sqlUpdate (tableEnv , pollSqlNode .toString ());
146+ Collection <String > newRegisterTableList = dimTableNewTable .keySet ();
147+ FlinkSQLExec .sqlInsert (tableEnv , pollSqlNode , newRegisterTableList );
140148 if (LOG .isInfoEnabled ()) {
141149 LOG .info ("----------real exec sql-----------\n {}" , pollSqlNode .toString ());
142150 }
143151
144152 } else if (pollSqlNode .getKind () == AS ) {
145- dealAsSourceTable (tableEnv , pollSqlNode , tableCache );
153+ Collection <String > newRegisterTableList = dimTableNewTable .keySet ();
154+ dealAsSourceTable (tableEnv , pollSqlNode , tableCache , newRegisterTableList );
146155
147156 } else if (pollSqlNode .getKind () == WITH_ITEM ) {
148157 SqlWithItem sqlWithItem = (SqlWithItem ) pollSqlNode ;
149- String TableAlias = sqlWithItem .name .toString ();
150- Table table = tableEnv .sqlQuery (sqlWithItem .query .toString ());
151- tableEnv .registerTable (TableAlias , table );
158+ String tableAlias = sqlWithItem .name .toString ();
159+ Collection <String > newRegisterTableList = dimTableNewTable .keySet ();
160+ Table table = FlinkSQLExec .sqlQuery (tableEnv , sqlWithItem .query , newRegisterTableList );
161+ tableEnv .createTemporaryView (tableAlias , table );
152162
153163 } else if (pollSqlNode .getKind () == SELECT ) {
154164 Preconditions .checkState (createView != null , "select sql must included by create view" );
155- Table table = tableEnv .sqlQuery (pollObj .toString ());
165+ Collection <String > newRegisterTableList = dimTableNewTable .keySet ();
166+ Table table = FlinkSQLExec .sqlQuery (tableEnv , pollSqlNode , newRegisterTableList );
156167
157168 if (createView .getFieldsInfoStr () == null ) {
158169 tableEnv .registerTable (createView .getTableName (), table );
159170 } else {
160171 if (checkFieldsInfo (createView , table )) {
161172 table = table .as (tmpFields );
162- tableEnv .registerTable (createView .getTableName (), table );
173+ tableEnv .createTemporaryView (createView .getTableName (), table );
163174 } else {
164175 throw new RuntimeException ("Fields mismatch" );
165176 }
166177 }
167178
168179 localTableCache .put (createView .getTableName (), table );
180+ if (includeDimTable ){
181+ dimTableNewTable .put (createView .getTableName (), table );
182+ }
169183 }
170-
184+ includeDimTable = false ;
171185 } else if (pollObj instanceof JoinInfo ) {
186+ includeDimTable = true ;
172187 LOG .info ("----------exec join info----------\n {}" , pollObj .toString ());
173- joinFun (pollObj , localTableCache , sideTableMap , tableEnv );
188+ joinFun (pollObj , localTableCache , dimTableNewTable , sideTableMap , tableEnv );
174189 }
175190 }
176191
@@ -414,14 +429,21 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
414429
415430 protected void dealAsSourceTable (StreamTableEnvironment tableEnv ,
416431 SqlNode pollSqlNode ,
417- Map <String , Table > tableCache ) throws SqlParseException {
432+ Map <String , Table > tableCache ,
433+ Collection <String > newRegisterTableList ) throws SqlParseException {
418434
419435 AliasInfo aliasInfo = parseASNode (pollSqlNode );
420436 if (localTableCache .containsKey (aliasInfo .getName ())) {
421437 return ;
422438 }
423439
440+ boolean isGroupByTimeWindow = TableUtils .checkIsDimTableGroupBy (pollSqlNode , newRegisterTableList );
441+ if (isGroupByTimeWindow ){
442+ QueryOperationConverter .setProducesUpdates (true );
443+ }
424444 Table table = tableEnv .sqlQuery (aliasInfo .getName ());
445+ QueryOperationConverter .setProducesUpdates (false );
446+
425447 tableEnv .registerTable (aliasInfo .getAlias (), table );
426448 localTableCache .put (aliasInfo .getAlias (), table );
427449
@@ -441,6 +463,7 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
441463
442464 private void joinFun (Object pollObj ,
443465 Map <String , Table > localTableCache ,
466+ Map <String , Table > dimTableNewTable ,
444467 Map <String , AbstractSideTableInfo > sideTableMap ,
445468 StreamTableEnvironment tableEnv ) throws Exception {
446469 JoinInfo joinInfo = (JoinInfo ) pollObj ;
@@ -525,7 +548,6 @@ private void joinFun(Object pollObj,
525548 RowTypeInfo typeInfo = new RowTypeInfo (fieldDataTypes , targetTable .getSchema ().getFieldNames ());
526549
527550 DataStream <BaseRow > adaptStream = tableEnv .toRetractStream (targetTable , typeInfo )
528- .filter (f -> f .f0 )
529551 .map (f -> RowDataConvert .convertToBaseRow (f ));
530552
531553 //join side table before keyby ===> Reducing the size of each dimension table cache of async
@@ -565,6 +587,7 @@ private void joinFun(Object pollObj,
565587 Table joinTable = tableEnv .fromDataStream (dsOut );
566588 tableEnv .createTemporaryView (targetTableName , joinTable );
567589 localTableCache .put (joinInfo .getNewTableName (), joinTable );
590+ dimTableNewTable .put (joinInfo .getNewTableName (), joinTable );
568591 }
569592 }
570593
@@ -593,4 +616,8 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
593616 return true ;
594617 }
595618
619+ public static Map <String , Table > getDimTableNewTable (){
620+ return dimTableNewTable ;
621+ }
622+
596623}
0 commit comments