@@ -188,17 +188,23 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
188188 return jarUrlList ;
189189 }
190190
191- public static void sqlTranslation (String localSqlPluginPath , StreamTableEnvironment tableEnv , SqlTree sqlTree , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache , StreamQueryConfig queryConfig ) throws Exception {
191+ private static void sqlTranslation (String localSqlPluginPath ,
192+ StreamTableEnvironment tableEnv ,
193+ SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,
194+ Map <String , Table > registerTableCache ,
195+ StreamQueryConfig queryConfig ) throws Exception {
196+
192197 SideSqlExec sideSqlExec = new SideSqlExec ();
193198 sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
194199 for (CreateTmpTableParser .SqlParserResult result : sqlTree .getTmpSqlList ()) {
195- sideSqlExec .registerTmpTable (result , sideTableMap , tableEnv , registerTableCache );
200+ sideSqlExec .exec (result . getExecSql () , sideTableMap , tableEnv , registerTableCache , queryConfig , result );
196201 }
197202
198203 for (InsertSqlParser .SqlParseResult result : sqlTree .getExecSqlList ()) {
199204 if (LOG .isInfoEnabled ()) {
200205 LOG .info ("exe-sql:\n " + result .getExecSql ());
201206 }
207+
202208 boolean isSide = false ;
203209 for (String tableName : result .getTargetTableList ()) {
204210 if (sqlTree .getTmpTableMap ().containsKey (tableName )) {
@@ -208,7 +214,7 @@ public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironm
208214 SqlNode sqlNode = org .apache .calcite .sql .parser .SqlParser .create (realSql , CalciteConfig .MYSQL_LEX_CONFIG ).parseStmt ();
209215 String tmpSql = ((SqlInsert ) sqlNode ).getSource ().toString ();
210216 tmp .setExecSql (tmpSql );
211- sideSqlExec .registerTmpTable (tmp , sideTableMap , tableEnv , registerTableCache );
217+ sideSqlExec .exec (tmp . getExecSql () , sideTableMap , tableEnv , registerTableCache , queryConfig , tmp );
212218 } else {
213219 for (String sourceTable : result .getSourceTableList ()) {
214220 if (sideTableMap .containsKey (sourceTable )) {
@@ -218,10 +224,14 @@ public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironm
218224 }
219225 if (isSide ) {
220226 //sql-dimensional table contains the dimension table of execution
221- sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache , queryConfig );
227+ sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache , queryConfig , null );
222228 } else {
229+ System .out .println ("----------exec sql without dimension join-----------" );
230+ System .out .println ("----------real sql exec is--------------------------" );
231+ System .out .println (result .getExecSql ());
223232 FlinkSQLExec .sqlUpdate (tableEnv , result .getExecSql (), queryConfig );
224233 if (LOG .isInfoEnabled ()) {
234+ System .out .println ();
225235 LOG .info ("exec sql: " + result .getExecSql ());
226236 }
227237 }
0 commit comments