Skip to content

Commit 5261849

Browse files
committed
调整side 结构
1 parent d63c81c commit 5261849

File tree

36 files changed

+1066
-511
lines changed

36 files changed

+1066
-511
lines changed

core/src/main/java/com/dtstack/flink/sql/enums/ECacheType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @author xuchao
2828
*/
2929
public enum ECacheType {
30-
NONE, LRU;
30+
NONE, LRU, ALL;
3131

3232
public static boolean isValid(String type){
3333
for(ECacheType tmpType : ECacheType.values()){
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.side;
21+
22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.streaming.api.datastream.DataStream;
24+
25+
import java.util.List;
26+
27+
/**
28+
* get plugin which implement from RichFlatMapFunction
29+
* Date: 2018/9/18
30+
* Company: www.dtstack.com
31+
*
32+
* @author xuchao
33+
*/
34+
35+
public class AllCacheOperator {
36+
37+
private static void loadFlatMap(){
38+
39+
}
40+
41+
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
42+
List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
43+
return null;
44+
}
45+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.side;
22+
23+
import org.apache.calcite.interpreter.Row;
24+
import org.apache.flink.api.common.functions.RichFlatMapFunction;
25+
26+
/**
27+
* Reason:
28+
* Date: 2018/9/18
29+
* Company: www.dtstack.com
30+
* @author xuchao
31+
*/
32+
33+
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row>{
34+
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 10 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -53,50 +53,24 @@
5353

5454
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
5555

56-
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
57-
5856
private static final long serialVersionUID = 2098635244857937717L;
5957

60-
protected RowTypeInfo rowTypeInfo;
61-
62-
protected List<FieldInfo> outFieldInfoList;
63-
64-
protected List<String> equalFieldList = Lists.newArrayList();
65-
66-
protected List<Integer> equalValIndex = Lists.newArrayList();
67-
68-
protected String sqlCondition = "";
69-
70-
protected String sideSelectFields = "";
71-
72-
protected JoinType joinType;
58+
protected SideReqRow sideReqRow;
7359

74-
//key:Returns the value of the position, returns the index values ​​in the input data
75-
protected Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
76-
77-
protected Map<Integer, Integer> sideFieldIndex = Maps.newHashMap();
78-
79-
protected SideTableInfo sideTableInfo;
80-
81-
protected AbsSideCache sideCache;
82-
83-
public AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
84-
SideTableInfo sideTableInfo){
85-
this.rowTypeInfo = rowTypeInfo;
86-
this.outFieldInfoList = outFieldInfoList;
87-
this.joinType = joinInfo.getJoinType();
88-
this.sideTableInfo = sideTableInfo;
89-
parseSelectFields(joinInfo);
90-
buildEqualInfo(joinInfo, sideTableInfo);
60+
public AsyncReqRow(SideReqRow sideReqRow){
61+
this.sideReqRow = sideReqRow;
9162
}
9263

9364
private void initCache(){
65+
SideTableInfo sideTableInfo = sideReqRow.getSideTableInfo();
9466
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
9567
return;
9668
}
9769

70+
AbsSideCache sideCache;
9871
if(ECacheType.LRU.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
9972
sideCache = new LRUSideCache(sideTableInfo);
73+
sideReqRow.setSideCache(sideCache);
10074
}else{
10175
throw new RuntimeException("not support side cache with type:" + sideTableInfo.getCacheType());
10276
}
@@ -105,101 +79,22 @@ private void initCache(){
10579
}
10680

10781
protected CacheObj getFromCache(String key){
108-
return sideCache.getFromCache(key);
82+
return sideReqRow.getSideCache().getFromCache(key);
10983
}
11084

11185
protected void putCache(String key, CacheObj value){
112-
sideCache.putCache(key, value);
86+
sideReqRow.getSideCache().putCache(key, value);
11387
}
11488

11589
protected boolean openCache(){
116-
return sideCache != null;
90+
return sideReqRow.getSideCache() != null;
11791
}
11892

119-
public void parseSelectFields(JoinInfo joinInfo){
120-
String sideTableName = joinInfo.getSideTableName();
121-
String nonSideTableName = joinInfo.getNonSideTable();
122-
List<String> fields = Lists.newArrayList();
123-
124-
int sideIndex = 0;
125-
for( int i=0; i<outFieldInfoList.size(); i++){
126-
FieldInfo fieldInfo = outFieldInfoList.get(i);
127-
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
128-
fields.add(fieldInfo.getFieldName());
129-
sideFieldIndex.put(i, sideIndex);
130-
sideIndex++;
131-
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
132-
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
133-
inFieldIndex.put(i, nonSideIndex);
134-
}else{
135-
throw new RuntimeException("unknown table " + fieldInfo.getTable());
136-
}
137-
}
138-
139-
if(fields.size() == 0){
140-
throw new RuntimeException("select non field from table " + sideTableName);
141-
}
142-
143-
sideSelectFields = String.join(",", fields);
144-
}
145-
146-
public abstract void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo);
147-
148-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
149-
if(sqlNode.getKind() != SqlKind.EQUALS){
150-
throw new RuntimeException("not equal operator.");
151-
}
152-
153-
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];
154-
SqlIdentifier right = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[1];
155-
156-
String leftTableName = left.getComponent(0).getSimple();
157-
String leftField = left.getComponent(1).getSimple();
158-
159-
String rightTableName = right.getComponent(0).getSimple();
160-
String rightField = right.getComponent(1).getSimple();
161-
162-
if(leftTableName.equalsIgnoreCase(sideTableName)){
163-
equalFieldList.add(leftField);
164-
int equalFieldIndex = -1;
165-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
166-
String fieldName = rowTypeInfo.getFieldNames()[i];
167-
if(fieldName.equalsIgnoreCase(rightField)){
168-
equalFieldIndex = i;
169-
}
170-
}
171-
if(equalFieldIndex == -1){
172-
throw new RuntimeException("can't find equal field " + rightField);
173-
}
174-
175-
equalValIndex.add(equalFieldIndex);
176-
177-
}else if(rightTableName.equalsIgnoreCase(sideTableName)){
178-
179-
equalFieldList.add(rightField);
180-
int equalFieldIndex = -1;
181-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
182-
String fieldName = rowTypeInfo.getFieldNames()[i];
183-
if(fieldName.equalsIgnoreCase(leftField)){
184-
equalFieldIndex = i;
185-
}
186-
}
187-
if(equalFieldIndex == -1){
188-
throw new RuntimeException("can't find equal field " + rightField);
189-
}
190-
191-
equalValIndex.add(equalFieldIndex);
192-
193-
}else{
194-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
195-
}
196-
197-
}
19893

19994
protected abstract Row fillData(Row input, Object sideInput);
20095

20196
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
202-
if(joinType == JoinType.LEFT){
97+
if(sideReqRow.getJoinType() == JoinType.LEFT){
20398
//Reserved left table data
20499
Row row = fillData(input, null);
205100
resultFuture.complete(Collections.singleton(row));
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.side;
21+
22+
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.util.PluginUtil;
24+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
25+
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
26+
import org.apache.flink.streaming.api.datastream.DataStream;
27+
28+
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
30+
31+
/**
32+
* fill data with lru cache
33+
* get data from External data source with async operator
34+
* Date: 2018/9/18
35+
* Company: www.dtstack.com
36+
* @author xuchao
37+
*/
38+
39+
public class LRUCacheOperator {
40+
41+
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
42+
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
43+
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
44+
String pathOfType = sideType + "side";
45+
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
46+
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
47+
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
48+
String className = PluginUtil.getSqlSideClassName(sideType, "side");
49+
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
50+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
51+
}
52+
53+
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
54+
List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
55+
AsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
56+
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
57+
return AsyncDataStream.orderedWait(inputStream, asyncDbReq, 10000, TimeUnit.MILLISECONDS, 10)
58+
.setParallelism(sideTableInfo.getParallelism());
59+
}
60+
}

0 commit comments

Comments
 (0)