Skip to content

Commit 750dbd0

Browse files
committed
mysql side with all-cache Strategy
1 parent 77d6477 commit 750dbd0

File tree

33 files changed

+748
-211
lines changed

33 files changed

+748
-211
lines changed

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23+
import com.dtstack.flink.sql.util.DtStringUtil;
2324
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
2425

26+
import java.util.List;
2527
import java.util.Map;
2628
import java.util.regex.Matcher;
2729
import java.util.regex.Pattern;
@@ -70,9 +72,9 @@ private Map parseProp(String propsStr){
7072
String[] strs = propsStr.trim().split("'\\s*,");
7173
Map<String, Object> propMap = Maps.newHashMap();
7274
for(int i=0; i<strs.length; i++){
73-
String[] ss = strs[i].split("=");
74-
String key = ss[0].trim();
75-
String value = ss[1].trim().replaceAll("'", "").trim();
75+
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
76+
String key = ss.get(0).trim();
77+
String value = ss.get(1).trim().replaceAll("'", "").trim();
7678
propMap.put(key, value);
7779
}
7880

core/src/main/java/com/dtstack/flink/sql/side/AllCacheOperator.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,15 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import org.apache.calcite.interpreter.Row;
23+
import com.dtstack.flink.sql.threadFactory.DTThreadFactory;
2424
import org.apache.flink.api.common.functions.RichFlatMapFunction;
25+
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.types.Row;
27+
28+
import java.sql.SQLException;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.TimeUnit;
2532

2633
/**
2734
* Reason:
@@ -31,4 +38,32 @@
3138
*/
3239

3340
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row>{
41+
42+
protected SideInfo sideInfo;
43+
44+
private ScheduledExecutorService es;
45+
46+
public AllReqRow(SideInfo sideInfo){
47+
this.sideInfo = sideInfo;
48+
49+
}
50+
51+
protected abstract Row fillData(Row input, Object sideInput);
52+
53+
protected abstract void initCache() throws SQLException;
54+
55+
protected abstract void reloadCache();
56+
57+
@Override
58+
public void open(Configuration parameters) throws Exception {
59+
super.open(parameters);
60+
initCache();
61+
System.out.println("----- all cacheRef init end-----");
62+
63+
//start reload cache thread
64+
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
65+
es = Executors.newSingleThreadScheduledExecutor(new DTThreadFactory("cache-all-reload"));
66+
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
67+
}
68+
3469
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,12 @@
2525
import com.dtstack.flink.sql.side.cache.CacheObj;
2626
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2727
import org.apache.calcite.sql.JoinType;
28-
import org.apache.calcite.sql.SqlBasicCall;
29-
import org.apache.calcite.sql.SqlIdentifier;
30-
import org.apache.calcite.sql.SqlKind;
31-
import org.apache.calcite.sql.SqlNode;
32-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
33-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
34-
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
3528
import org.apache.flink.configuration.Configuration;
3629
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3730
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3831
import org.apache.flink.types.Row;
39-
import org.slf4j.Logger;
40-
import org.slf4j.LoggerFactory;
4132

4233
import java.util.Collections;
43-
import java.util.List;
44-
import java.util.Map;
4534

4635
/**
4736
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -55,22 +44,22 @@ public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
5544

5645
private static final long serialVersionUID = 2098635244857937717L;
5746

58-
protected SideReqRow sideReqRow;
47+
protected SideInfo sideInfo;
5948

60-
public AsyncReqRow(SideReqRow sideReqRow){
61-
this.sideReqRow = sideReqRow;
49+
public AsyncReqRow(SideInfo sideInfo){
50+
this.sideInfo = sideInfo;
6251
}
6352

6453
private void initCache(){
65-
SideTableInfo sideTableInfo = sideReqRow.getSideTableInfo();
54+
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
6655
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
6756
return;
6857
}
6958

7059
AbsSideCache sideCache;
7160
if(ECacheType.LRU.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
7261
sideCache = new LRUSideCache(sideTableInfo);
73-
sideReqRow.setSideCache(sideCache);
62+
sideInfo.setSideCache(sideCache);
7463
}else{
7564
throw new RuntimeException("not support side cache with type:" + sideTableInfo.getCacheType());
7665
}
@@ -79,22 +68,22 @@ private void initCache(){
7968
}
8069

8170
protected CacheObj getFromCache(String key){
82-
return sideReqRow.getSideCache().getFromCache(key);
71+
return sideInfo.getSideCache().getFromCache(key);
8372
}
8473

8574
protected void putCache(String key, CacheObj value){
86-
sideReqRow.getSideCache().putCache(key, value);
75+
sideInfo.getSideCache().putCache(key, value);
8776
}
8877

8978
protected boolean openCache(){
90-
return sideReqRow.getSideCache() != null;
79+
return sideInfo.getSideCache() != null;
9180
}
9281

9382

9483
protected abstract Row fillData(Row input, Object sideInput);
9584

9685
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
97-
if(sideReqRow.getJoinType() == JoinType.LEFT){
86+
if(sideInfo.getJoinType() == JoinType.LEFT){
9887
//Reserved left table data
9988
Row row = fillData(input, null);
10089
resultFuture.complete(Collections.singleton(row));

core/src/main/java/com/dtstack/flink/sql/side/SideReqRow.java renamed to core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
3131
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
3232

33+
import java.io.Serializable;
3334
import java.util.List;
3435
import java.util.Map;
3536

@@ -40,7 +41,7 @@
4041
* @author xuchao
4142
*/
4243

43-
public abstract class SideReqRow {
44+
public abstract class SideInfo implements Serializable{
4445

4546
protected RowTypeInfo rowTypeInfo;
4647

@@ -56,17 +57,21 @@ public abstract class SideReqRow {
5657

5758
protected JoinType joinType;
5859

59-
//key:Returns the value of the position, returns the index values ​​in the input data
60+
//key:Returns the value of the position, value: the ref field index​in the input table
6061
protected Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
6162

63+
//key:Returns the value of the position, value: the ref field index​in the side table
6264
protected Map<Integer, Integer> sideFieldIndex = Maps.newHashMap();
6365

66+
//key:Returns the value of the position, value: the ref field name​in the side table
67+
protected Map<Integer, String> sideFieldNameIndex = Maps.newHashMap();
68+
6469
protected SideTableInfo sideTableInfo;
6570

6671
protected AbsSideCache sideCache;
6772

68-
public SideReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
69-
SideTableInfo sideTableInfo){
73+
public SideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
74+
SideTableInfo sideTableInfo){
7075
this.rowTypeInfo = rowTypeInfo;
7176
this.outFieldInfoList = outFieldInfoList;
7277
this.joinType = joinInfo.getJoinType();
@@ -86,6 +91,7 @@ public void parseSelectFields(JoinInfo joinInfo){
8691
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
8792
fields.add(fieldInfo.getFieldName());
8893
sideFieldIndex.put(i, sideIndex);
94+
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
8995
sideIndex++;
9096
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
9197
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
@@ -241,4 +247,12 @@ public AbsSideCache getSideCache() {
241247
public void setSideCache(AbsSideCache sideCache) {
242248
this.sideCache = sideCache;
243249
}
250+
251+
public Map<Integer, String> getSideFieldNameIndex() {
252+
return sideFieldNameIndex;
253+
}
254+
255+
public void setSideFieldNameIndex(Map<Integer, String> sideFieldNameIndex) {
256+
this.sideFieldNameIndex = sideFieldNameIndex;
257+
}
244258
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
2423
import com.dtstack.flink.sql.enums.ECacheType;
25-
import com.dtstack.flink.sql.util.PluginUtil;
24+
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
25+
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2626
import org.apache.calcite.sql.SqlBasicCall;
2727
import org.apache.calcite.sql.SqlDataTypeSpec;
2828
import org.apache.calcite.sql.SqlIdentifier;
@@ -42,7 +42,6 @@
4242
import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
4343
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4444
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
45-
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
4645
import org.apache.flink.streaming.api.datastream.DataStream;
4746
import org.apache.flink.table.api.Table;
4847
import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -51,7 +50,6 @@
5150
import java.util.List;
5251
import java.util.Map;
5352
import java.util.Queue;
54-
import java.util.concurrent.TimeUnit;
5553

5654
import static org.apache.calcite.sql.SqlKind.*;
5755

@@ -161,9 +159,9 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
161159

162160
DataStream dsOut = null;
163161
if(ECacheType.ALL.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
164-
162+
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
165163
}else{
166-
dsOut = LRUCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
164+
dsOut = SideAsyncOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
167165
}
168166

169167
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.dtstack.flink.sql.util.PluginUtil;
2727

2828
/**
29-
* 流处理
29+
* get specify side parser
3030
* Date: 2018/7/25
3131
* Company: www.dtstack.com
3232
* @author xuchao
@@ -36,12 +36,19 @@ public class StreamSideFactory {
3636

3737
private static final String CURR_TYPE = "side";
3838

39-
public static AbsTableParser getSqlParser(String resultType, String sqlRootDir) throws Exception {
39+
private static final String SIDE_DIR_TMPL = "%s%sside";
40+
41+
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
42+
43+
cacheType = cacheType == null ? "async" : cacheType;
44+
String sideDir = String.format(SIDE_DIR_TMPL, pluginType, cacheType);
4045
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
41-
String pluginJarPath = PluginUtil.getJarFileDirPath(resultType + CURR_TYPE, sqlRootDir);
46+
String pluginJarPath = PluginUtil.getJarFileDirPath(sideDir, sqlRootDir);
47+
4248
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
4349
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
44-
String className = PluginUtil.getSqlParserClassName(resultType, CURR_TYPE);
50+
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
51+
4552
Class<?> sideParser = dtClassLoader.loadClass(className);
4653
if(!AbsSideTableParser.class.isAssignableFrom(sideParser)){
4754
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");

core/src/main/java/com/dtstack/flink/sql/side/LRUCacheOperator.java renamed to core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
*/
1818

1919

20-
package com.dtstack.flink.sql.side;
20+
package com.dtstack.flink.sql.side.operator;
2121

2222
import com.dtstack.flink.sql.classloader.DtClassLoader;
23+
import com.dtstack.flink.sql.side.AsyncReqRow;
24+
import com.dtstack.flink.sql.side.FieldInfo;
25+
import com.dtstack.flink.sql.side.JoinInfo;
26+
import com.dtstack.flink.sql.side.SideTableInfo;
2327
import com.dtstack.flink.sql.util.PluginUtil;
2428
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2529
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -36,16 +40,18 @@
3640
* @author xuchao
3741
*/
3842

39-
public class LRUCacheOperator {
43+
public class SideAsyncOperator {
44+
45+
private static final String PATH_FORMAT = "%sasyncside";
4046

4147
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
42-
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
48+
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
4349
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
44-
String pathOfType = sideType + "side";
50+
String pathOfType = String.format(PATH_FORMAT, sideType);
4551
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
4652
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
4753
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
48-
String className = PluginUtil.getSqlSideClassName(sideType, "side");
54+
String className = PluginUtil.getSqlSideClassName(sideType, "side", "Async");
4955
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
5056
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
5157
}

0 commit comments

Comments
 (0)