|
53 | 53 | import org.apache.calcite.sql.SqlInsert; |
54 | 54 | import org.apache.calcite.sql.SqlNode; |
55 | 55 | import org.apache.commons.io.Charsets; |
| 56 | +import org.apache.commons.lang3.SerializationUtils; |
56 | 57 | import org.apache.commons.lang3.StringUtils; |
57 | 58 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
58 | 59 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
|
75 | 76 | import java.net.URLClassLoader; |
76 | 77 | import java.net.URLDecoder; |
77 | 78 | import java.time.ZoneId; |
78 | | -import java.util.ArrayList; |
79 | | -import java.util.Arrays; |
80 | | -import java.util.List; |
81 | | -import java.util.Map; |
82 | | -import java.util.Properties; |
83 | | -import java.util.Set; |
84 | | -import java.util.TimeZone; |
| 79 | +import java.util.*; |
85 | 80 | import java.util.stream.Stream; |
86 | 81 |
|
87 | 82 | /** |
@@ -215,7 +210,11 @@ private static void sqlTranslation(String localSqlPluginPath, |
215 | 210 | scope++; |
216 | 211 | } |
217 | 212 |
|
| 213 | + final Map<String, AbstractSideTableInfo> tmpTableMap = new HashMap<>(); |
218 | 214 | for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) { |
| 215 | + // prevent current sql use last sql's sideTableInfo |
| 216 | + sideTableMap.forEach((s, abstractSideTableInfo) -> tmpTableMap.put(s, SerializationUtils.clone(abstractSideTableInfo))); |
| 217 | + |
219 | 218 | if (LOG.isInfoEnabled()) { |
220 | 219 | LOG.info("exe-sql:\n" + result.getExecSql()); |
221 | 220 | } |
@@ -251,6 +250,7 @@ private static void sqlTranslation(String localSqlPluginPath, |
251 | 250 |
|
252 | 251 | scope++; |
253 | 252 | } |
| 253 | + tmpTableMap.clear(); |
254 | 254 | } |
255 | 255 | } |
256 | 256 |
|
|
0 commit comments