@@ -187,12 +187,17 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
187187 }
188188 return jarUrlList ;
189189 }
190-
191- public static void sqlTranslation (String localSqlPluginPath , StreamTableEnvironment tableEnv , SqlTree sqlTree , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache , StreamQueryConfig queryConfig ) throws Exception {
192- SideSqlExec sideSqlExec = new SideSqlExec ();
190+
191+ private static void sqlTranslation (String localSqlPluginPath ,
192+ StreamTableEnvironment tableEnv ,
193+ SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,
194+ Map <String , Table > registerTableCache ,
195+ StreamQueryConfig queryConfig ) throws Exception {
196+
197+ SideSqlExec sideSqlExec = new SideSqlExec ();
193198 sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
194199 for (CreateTmpTableParser .SqlParserResult result : sqlTree .getTmpSqlList ()) {
195- sideSqlExec .registerTmpTable (result , sideTableMap , tableEnv , registerTableCache );
200+ sideSqlExec .exec (result . getExecSql () , sideTableMap , tableEnv , registerTableCache , queryConfig , result );
196201 }
197202
198203 for (InsertSqlParser .SqlParseResult result : sqlTree .getExecSqlList ()) {
@@ -208,7 +213,7 @@ public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironm
208213 SqlNode sqlNode = org .apache .calcite .sql .parser .SqlParser .create (realSql , CalciteConfig .MYSQL_LEX_CONFIG ).parseStmt ();
209214 String tmpSql = ((SqlInsert ) sqlNode ).getSource ().toString ();
210215 tmp .setExecSql (tmpSql );
211- sideSqlExec .registerTmpTable (tmp , sideTableMap , tableEnv , registerTableCache );
216+ sideSqlExec .exec (tmp . getExecSql () , sideTableMap , tableEnv , registerTableCache , queryConfig , tmp );
212217 } else {
213218 for (String sourceTable : result .getSourceTableList ()) {
214219 if (sideTableMap .containsKey (sourceTable )) {
0 commit comments