5555import org .apache .calcite .sql .SqlInsert ;
5656import org .apache .calcite .sql .SqlNode ;
5757import org .apache .commons .io .Charsets ;
58+ import org .apache .commons .lang3 .SerializationUtils ;
5859import org .apache .commons .lang3 .StringUtils ;
5960import org .apache .flink .api .common .typeinfo .TypeInformation ;
6061import org .apache .flink .api .java .typeutils .RowTypeInfo ;
7980import java .time .ZoneId ;
8081import java .util .ArrayList ;
8182import java .util .Arrays ;
83+ import java .util .HashMap ;
8284import java .util .List ;
8385import java .util .Map ;
8486import java .util .Objects ;
@@ -244,7 +246,11 @@ private static void sqlTranslation(String localSqlPluginPath,
244246 scope ++;
245247 }
246248
249+ final Map <String , AbstractSideTableInfo > tmpTableMap = new HashMap <>();
247250 for (InsertSqlParser .SqlParseResult result : sqlTree .getExecSqlList ()) {
251+ // prevent current sql use last sql's sideTableInfo
252+ sideTableMap .forEach ((s , abstractSideTableInfo ) -> tmpTableMap .put (s , SerializationUtils .clone (abstractSideTableInfo )));
253+
248254 if (LOG .isInfoEnabled ()) {
249255 LOG .info ("exe-sql:\n " + result .getExecSql ());
250256 }
@@ -257,17 +263,17 @@ private static void sqlTranslation(String localSqlPluginPath,
257263 SqlNode sqlNode = flinkPlanner .getParser ().parse (realSql );
258264 String tmpSql = ((SqlInsert ) sqlNode ).getSource ().toString ();
259265 tmp .setExecSql (tmpSql );
260- sideSqlExec .exec (tmp .getExecSql (), sideTableMap , tableEnv , registerTableCache , tmp , scope + "" );
266+ sideSqlExec .exec (tmp .getExecSql (), tmpTableMap , tableEnv , registerTableCache , tmp , scope + "" );
261267 } else {
262268 for (String sourceTable : result .getSourceTableList ()) {
263- if (sideTableMap .containsKey (sourceTable )) {
269+ if (tmpTableMap .containsKey (sourceTable )) {
264270 isSide = true ;
265271 break ;
266272 }
267273 }
268274 if (isSide ) {
269275 //sql-dimensional table contains the dimension table of execution
270- sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache , null , String .valueOf (scope ));
276+ sideSqlExec .exec (result .getExecSql (), tmpTableMap , tableEnv , registerTableCache , null , String .valueOf (scope ));
271277 } else {
272278 LOG .info ("----------exec sql without dimension join-----------" );
273279 LOG .info ("----------real sql exec is--------------------------\n {}" , result .getExecSql ());
@@ -280,6 +286,7 @@ private static void sqlTranslation(String localSqlPluginPath,
280286
281287 scope ++;
282288 }
289+ tmpTableMap .clear ();
283290 }
284291 }
285292
0 commit comments