Skip to content

Commit 1f8353e

Browse files
committed
Merge branch 'feat_keyPartition_mergeTest' into '1.8_test_3.10.x'
Feat key partition merge test See merge request !268
2 parents 27ed707 + cf5e1ee commit 1f8353e

File tree

24 files changed

+1240
-608
lines changed

24 files changed

+1240
-608
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,17 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
187187
}
188188
return jarUrlList;
189189
}
190-
191-
public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv, SqlTree sqlTree, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
192-
SideSqlExec sideSqlExec = new SideSqlExec();
190+
191+
private static void sqlTranslation(String localSqlPluginPath,
192+
StreamTableEnvironment tableEnv,
193+
SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,
194+
Map<String, Table> registerTableCache,
195+
StreamQueryConfig queryConfig) throws Exception {
196+
197+
SideSqlExec sideSqlExec = new SideSqlExec();
193198
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
194199
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
195-
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
200+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
196201
}
197202

198203
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
@@ -208,7 +213,7 @@ public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironm
208213
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
209214
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
210215
tmp.setExecSql(tmpSql);
211-
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
216+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
212217
} else {
213218
for (String sourceTable : result.getSourceTableList()) {
214219
if (sideTableMap.containsKey(sourceTable)) {
@@ -218,8 +223,11 @@ public static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironm
218223
}
219224
if (isSide) {
220225
//sql-dimensional table contains the dimension table of execution
221-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
226+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
222227
} else {
228+
System.out.println("----------exec sql without dimension join-----------");
229+
System.out.println("----------real sql exec is--------------------------");
230+
System.out.println(result.getExecSql());
223231
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
224232
if (LOG.isInfoEnabled()) {
225233
LOG.info("exec sql: " + result.getExecSql());

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
*/
3737
public class SerializationMetricWrapper implements SerializationSchema<Row> {
3838

39+
private static final long serialVersionUID = 1L;
40+
3941
private SerializationSchema<Row> serializationSchema;
4042

4143
private transient RuntimeContext runtimeContext;
@@ -77,4 +79,8 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
7779
this.runtimeContext = runtimeContext;
7880
}
7981

82+
public SerializationSchema<Row> getSerializationSchema() {
83+
return serializationSchema;
84+
}
85+
8086
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,21 @@ public boolean isLeftIsTmpTable() {
202202
public void setLeftIsTmpTable(boolean leftIsTmpTable) {
203203
this.leftIsTmpTable = leftIsTmpTable;
204204
}
205+
206+
@Override
207+
public String toString() {
208+
return "JoinInfo{" +
209+
"leftIsSideTable=" + leftIsSideTable +
210+
", leftIsTmpTable=" + leftIsTmpTable +
211+
", rightIsSideTable=" + rightIsSideTable +
212+
", leftTableName='" + leftTableName + '\'' +
213+
", leftTableAlias='" + leftTableAlias + '\'' +
214+
", rightTableName='" + rightTableName + '\'' +
215+
", rightTableAlias='" + rightTableAlias + '\'' +
216+
", condition=" + condition +
217+
", selectFields=" + selectFields +
218+
", selectNode=" + selectNode +
219+
", joinType=" + joinType +
220+
'}';
221+
}
205222
}

0 commit comments

Comments
 (0)