Skip to content

Commit e568264

Browse files
committed
合并维表关联部分临时表注册和insert 部分
1 parent 6a9cc95 commit e568264

File tree

3 files changed

+57
-94
lines changed

3 files changed

+57
-94
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
153153
SideSqlExec sideSqlExec = new SideSqlExec();
154154
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
155155
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
156-
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
156+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
157157
}
158158

159159
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
@@ -169,7 +169,7 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
169169
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
170170
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
171171
tmp.setExecSql(tmpSql);
172-
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
172+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
173173
} else {
174174
for (String sourceTable : result.getSourceTableList()) {
175175
if (sideTableMap.containsKey(sourceTable)) {
@@ -179,7 +179,7 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
179179
}
180180
if (isSide) {
181181
//sql-dimensional table contains the dimension table of execution
182-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
182+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
183183
}else{
184184
System.out.println("----------exec sql without dimension join-----------" );
185185
System.out.println("----------real sql exec is--------------------------");

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,6 @@ public String toString() {
213213
", leftTableAlias='" + leftTableAlias + '\'' +
214214
", rightTableName='" + rightTableName + '\'' +
215215
", rightTableAlias='" + rightTableAlias + '\'' +
216-
", leftNode=" + leftNode +
217-
", rightNode=" + rightNode +
218216
", condition=" + condition +
219217
", selectFields=" + selectFields +
220218
", selectNode=" + selectNode +

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 54 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.dtstack.flink.sql.util.ClassUtil;
2929
import com.dtstack.flink.sql.util.ParseUtils;
3030
import com.dtstack.flink.sql.util.TableUtils;
31+
import com.google.common.base.Preconditions;
3132
import com.google.common.collect.HashBasedTable;
3233
import com.google.common.collect.Lists;
3334
import com.google.common.collect.Maps;
@@ -87,14 +88,12 @@ public class SideSqlExec {
8788

8889
private String tmpFields = null;
8990

90-
private SideSQLParser sideSQLParser = new SideSQLParser();
91-
9291
private SidePredicatesParser sidePredicatesParser = new SidePredicatesParser();
9392

9493
private Map<String, Table> localTableCache = Maps.newHashMap();
9594

9695
public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
97-
Map<String, Table> tableCache, StreamQueryConfig queryConfig) throws Exception {
96+
Map<String, Table> tableCache, StreamQueryConfig queryConfig, CreateTmpTableParser.SqlParserResult createView) throws Exception {
9897
if(localSqlPluginPath == null){
9998
throw new RuntimeException("need to set localSqlPluginPath");
10099
}
@@ -106,6 +105,13 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
106105
LOG.error("fill predicates for sideTable fail ", e);
107106
}
108107

108+
if(createView != null){
109+
LOG.warn("create view info\n");
110+
LOG.warn(createView.getExecSql());
111+
LOG.warn("-----------------");
112+
}
113+
114+
SideSQLParser sideSQLParser = new SideSQLParser();
109115
sideSQLParser.setLocalTableCache(localTableCache);
110116
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
111117
Object pollObj = null;
@@ -136,32 +142,32 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
136142
if(LOG.isInfoEnabled()){
137143
LOG.info("exec sql: " + pollSqlNode.toString());
138144
}
145+
139146
}else if(pollSqlNode.getKind() == AS){
140-
AliasInfo aliasInfo = parseASNode(pollSqlNode);
141-
Table table = tableEnv.sqlQuery(aliasInfo.getName());
142-
tableEnv.registerTable(aliasInfo.getAlias(), table);
143-
localTableCache.put(aliasInfo.getAlias(), table);
144-
145-
FieldReplaceInfo fieldReplaceInfo = parseAsQuery((SqlBasicCall) pollSqlNode, tableCache);
146-
if(fieldReplaceInfo != null){
147-
//as 的源表
148-
Set<String> fromTableNameSet = Sets.newHashSet();
149-
SqlNode fromNode = ((SqlBasicCall)pollSqlNode).getOperands()[0];
150-
TableUtils.getFromTableInfo(fromNode, fromTableNameSet);
151-
for(FieldReplaceInfo tmp : replaceInfoList){
152-
if(fromTableNameSet.contains(tmp.getTargetTableName())
153-
|| fromTableNameSet.contains(tmp.getTargetTableAlias())){
154-
fieldReplaceInfo.setPreNode(tmp);
155-
break;
156-
}
157-
}
158-
replaceInfoList.add(fieldReplaceInfo);
159-
}
147+
dealAsSourceTable(tableEnv, pollSqlNode, tableCache, replaceInfoList);
148+
160149
} else if (pollSqlNode.getKind() == WITH_ITEM) {
161150
SqlWithItem sqlWithItem = (SqlWithItem) pollSqlNode;
162151
String TableAlias = sqlWithItem.name.toString();
163152
Table table = tableEnv.sqlQuery(sqlWithItem.query.toString());
164153
tableEnv.registerTable(TableAlias, table);
154+
155+
} else if (pollSqlNode.getKind() == SELECT){
156+
Preconditions.checkState(createView != null, "select sql must included by create view");
157+
Table table = tableEnv.sqlQuery(pollObj.toString());
158+
159+
if (createView.getFieldsInfoStr() == null){
160+
tableEnv.registerTable(createView.getTableName(), table);
161+
} else {
162+
if (checkFieldsInfo(createView, table)){
163+
table = table.as(tmpFields);
164+
tableEnv.registerTable(createView.getTableName(), table);
165+
} else {
166+
throw new RuntimeException("Fields mismatch");
167+
}
168+
}
169+
170+
localTableCache.put(createView.getTableName(), table);
165171
}
166172

167173
}else if (pollObj instanceof JoinInfo){
@@ -174,6 +180,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
174180

175181
}
176182

183+
177184
/**
178185
* 解析出as查询的表和字段的关系
179186
* @param asSqlNode
@@ -709,81 +716,39 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
709716
return conditionFields;
710717
}
711718

712-
//TODO 合并临时表处理逻辑
713-
public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
714-
Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
715-
Map<String, Table> tableCache)
716-
throws Exception {
717-
718-
if(localSqlPluginPath == null){
719-
throw new RuntimeException("need to set localSqlPluginPath");
720-
}
721-
722-
localTableCache.putAll(tableCache);
723-
Queue<Object> exeQueue = sideSQLParser.getExeQueue(result.getExecSql(), sideTableMap.keySet());
724-
Object pollObj = null;
719+
protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
720+
SqlNode pollSqlNode,
721+
Map<String, Table> tableCache,
722+
List<FieldReplaceInfo> replaceInfoList) throws SqlParseException {
725723

726-
//need clean
727-
boolean preIsSideJoin = false;
728-
List<FieldReplaceInfo> replaceInfoList = Lists.newArrayList();
729-
730-
while((pollObj = exeQueue.poll()) != null){
731-
732-
if(pollObj instanceof SqlNode){
733-
SqlNode pollSqlNode = (SqlNode) pollObj;
734-
735-
if(preIsSideJoin){
736-
preIsSideJoin = false;
737-
List<String> fieldNames = null;
738-
for (FieldReplaceInfo replaceInfo : replaceInfoList) {
739-
fieldNames = Lists.newArrayList();
740-
replaceFieldName(pollSqlNode, replaceInfo);
741-
addAliasForFieldNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
742-
}
743-
}
744-
745-
if(pollSqlNode.getKind() == INSERT){
746-
tableEnv.sqlUpdate(pollSqlNode.toString());
747-
}else if(pollSqlNode.getKind() == AS){
748-
dealAsSourceTable(tableEnv, pollSqlNode);
749-
} else if (pollSqlNode.getKind() == SELECT){
750-
Table table = tableEnv.sqlQuery(pollObj.toString());
751-
if (result.getFieldsInfoStr() == null){
752-
tableEnv.registerTable(result.getTableName(), table);
753-
} else {
754-
if (checkFieldsInfo(result, table)){
755-
table = table.as(tmpFields);
756-
tableEnv.registerTable(result.getTableName(), table);
757-
} else {
758-
throw new RuntimeException("Fields mismatch");
759-
}
760-
}
761-
localTableCache.put(result.getTableName(), table);
762-
763-
}
764-
765-
}else if (pollObj instanceof JoinInfo){
766-
preIsSideJoin = true;
767-
System.out.println("----------exec join info----------");
768-
System.out.println(pollObj.toString());
769-
joinFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
770-
}
771-
}
772-
}
773-
774-
protected void dealAsSourceTable(StreamTableEnvironment tableEnv, SqlNode pollSqlNode) throws SqlParseException {
775724
AliasInfo aliasInfo = parseASNode(pollSqlNode);
776725
if (localTableCache.containsKey(aliasInfo.getName())) {
777726
return;
778727
}
779728

780729
Table table = tableEnv.sqlQuery(aliasInfo.getName());
781730
tableEnv.registerTable(aliasInfo.getAlias(), table);
782-
if (LOG.isInfoEnabled()) {
783-
LOG.info("Register Table {} by {}", aliasInfo.getAlias(), aliasInfo.getName());
731+
localTableCache.put(aliasInfo.getAlias(), table);
732+
733+
LOG.info("Register Table {} by {}", aliasInfo.getAlias(), aliasInfo.getName());
734+
735+
FieldReplaceInfo fieldReplaceInfo = parseAsQuery((SqlBasicCall) pollSqlNode, tableCache);
736+
if(fieldReplaceInfo == null){
737+
return;
784738
}
785739

786-
localTableCache.put(aliasInfo.getAlias(), table);
740+
//as 的源表
741+
Set<String> fromTableNameSet = Sets.newHashSet();
742+
SqlNode fromNode = ((SqlBasicCall)pollSqlNode).getOperands()[0];
743+
TableUtils.getFromTableInfo(fromNode, fromTableNameSet);
744+
for(FieldReplaceInfo tmp : replaceInfoList){
745+
if(fromTableNameSet.contains(tmp.getTargetTableName())
746+
|| fromTableNameSet.contains(tmp.getTargetTableAlias())){
747+
fieldReplaceInfo.setPreNode(tmp);
748+
break;
749+
}
750+
}
751+
replaceInfoList.add(fieldReplaceInfo);
787752
}
788753

789754
private void joinFun(Object pollObj, Map<String, Table> localTableCache,
@@ -798,7 +763,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
798763

799764
SqlKind sqlKind = joinInfo.getLeftNode().getKind();
800765
if(sqlKind == AS){
801-
dealAsSourceTable(tableEnv, joinInfo.getLeftNode());
766+
dealAsSourceTable(tableEnv, joinInfo.getLeftNode(), localTableCache, replaceInfoList);
802767
}
803768

804769
Table leftTable = getTableFromCache(localTableCache, joinInfo.getLeftTableAlias(), joinInfo.getLeftTableName());

0 commit comments

Comments
 (0)