Skip to content

Commit 4c7f986

Browse files
committed
修改合并冲突问题
1 parent 4bdbe90 commit 4c7f986

File tree

24 files changed

+65
-69
lines changed

24 files changed

+65
-69
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
127127
.setPluginLoadMode(pluginLoadMode)
128128
.setDeployMode(deployMode)
129129
.setConfProp(confProperties)
130-
.setJarUrlList(jarURList)
130+
.setJarUrlList(jarUrlList)
131131
.build();
132132

133133
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class SideSqlExec {
9999

100100
private Map<String, Table> localTableCache = Maps.newHashMap();
101101

102-
public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
102+
public void exec(String sql, Map<String, AbstractSideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
103103
Map<String, Table> tableCache, StreamQueryConfig queryConfig, CreateTmpTableParser.SqlParserResult createView) throws Exception {
104104
if(localSqlPluginPath == null){
105105
throw new RuntimeException("need to set localSqlPluginPath");
@@ -732,7 +732,7 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv,
732732
Map<String, Table> tableCache,
733733
List<FieldReplaceInfo> replaceInfoList) throws SqlParseException {
734734

735-
AliasInfo aliasInfo = parseASNode(pollSqlNode);
735+
AliasInfo aliasInfo = parseAsNode(pollSqlNode);
736736
if (localTableCache.containsKey(aliasInfo.getName())) {
737737
return;
738738
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSideTableParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ protected void parseCacheProp(AbstractSideTableInfo sideTableInfo, Map<String, O
108108
}
109109
sideTableInfo.setAsyncTimeout(asyncTimeout);
110110
}
111-
if(props.containsKey(SideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase())){
112-
Integer asyncTimeoutNum = MathUtil.getIntegerVal(props.get(SideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase()));
111+
if(props.containsKey(AbstractSideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase())){
112+
Integer asyncTimeoutNum = MathUtil.getIntegerVal(props.get(AbstractSideTableInfo.ASYNC_TIMEOUT_NUM_KEY.toLowerCase()));
113113
if (asyncTimeoutNum > 0){
114114
sideTableInfo.setAsyncTimeoutNumLimit(asyncTimeoutNum);
115115
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

23+
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
2324
import org.apache.flink.api.common.typeinfo.TypeInformation;
2425
import org.apache.flink.api.java.tuple.Tuple2;
2526
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -33,7 +34,6 @@
3334

3435
import com.dtstack.flink.sql.sink.IStreamSinkGener;
3536
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
36-
import com.dtstack.flink.sql.table.TargetTableInfo;
3737
import org.apache.commons.lang3.StringUtils;
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,22 @@
1818

1919
package com.dtstack.flink.sql.side.elasticsearch6;
2020

21-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
import org.apache.flink.table.runtime.types.CRow;
23-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24-
import org.apache.flink.types.Row;
25-
import org.apache.flink.util.Collector;
26-
27-
import com.dtstack.flink.sql.side.AllReqRow;
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.BaseAllReqRow;
2823
import com.dtstack.flink.sql.side.FieldInfo;
2924
import com.dtstack.flink.sql.side.JoinInfo;
30-
import com.dtstack.flink.sql.side.SideTableInfo;
3125
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
3226
import com.dtstack.flink.sql.side.elasticsearch6.util.Es6Util;
3327
import com.dtstack.flink.sql.side.elasticsearch6.util.SwitchUtil;
3428
import com.google.common.collect.Lists;
3529
import com.google.common.collect.Maps;
36-
import org.apache.calcite.sql.JoinType;
3730
import org.apache.commons.collections.CollectionUtils;
3831
import org.apache.commons.lang3.StringUtils;
32+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
33+
import org.apache.flink.table.runtime.types.CRow;
34+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
35+
import org.apache.flink.types.Row;
36+
import org.apache.flink.util.Collector;
3937
import org.elasticsearch.action.search.SearchRequest;
4038
import org.elasticsearch.action.search.SearchResponse;
4139
import org.elasticsearch.client.RequestOptions;
@@ -49,7 +47,6 @@
4947

5048
import java.io.IOException;
5149
import java.io.Serializable;
52-
import java.sql.SQLException;
5350
import java.sql.Timestamp;
5451
import java.util.Calendar;
5552
import java.util.List;
@@ -60,7 +57,7 @@
6057
* @author yinxi
6158
* @date 2020/1/13 - 1:00
6259
*/
63-
public class Elasticsearch6AllReqRow extends AllReqRow implements Serializable {
60+
public class Elasticsearch6AllReqRow extends BaseAllReqRow implements Serializable {
6461

6562
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AllReqRow.class);
6663

@@ -70,7 +67,7 @@ public class Elasticsearch6AllReqRow extends AllReqRow implements Serializable {
7067
private SearchRequest searchRequest;
7168
private BoolQueryBuilder boolQueryBuilder;
7269

73-
public Elasticsearch6AllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
70+
public Elasticsearch6AllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
7471
super(new Elasticsearch6AllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7572
}
7673

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllSideInfo.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818

1919
package com.dtstack.flink.sql.side.elasticsearch6;
2020

21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.BaseSideInfo;
2123
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2224

2325
import com.dtstack.flink.sql.side.FieldInfo;
2426
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.SideInfo;
26-
import com.dtstack.flink.sql.side.SideTableInfo;
27+
2728
import com.dtstack.flink.sql.util.ParseUtils;
2829
import com.google.common.collect.Lists;
2930
import org.apache.calcite.sql.SqlNode;
@@ -35,15 +36,15 @@
3536
* @author yinxi
3637
* @date 2020/1/13 - 1:01
3738
*/
38-
public class Elasticsearch6AllSideInfo extends SideInfo {
39+
public class Elasticsearch6AllSideInfo extends BaseSideInfo {
3940

4041

41-
public Elasticsearch6AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
42+
public Elasticsearch6AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4243
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4344
}
4445

4546
@Override
46-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
47+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
4748

4849
}
4950

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@
5757
* @author yinxi
5858
* @date 2020/2/13 - 13:10
5959
*/
60-
public class Elasticsearch6AsyncReqRow extends AsyncReqRow implements Serializable {
60+
public class Elasticsearch6AsyncReqRow extends BaseAsyncReqRow implements Serializable {
6161

6262
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6AsyncReqRow.class);
6363
private transient RestHighLevelClient rhlClient;
6464
private SearchRequest searchRequest;
6565
private List<String> sqlJoinCompareOperate = Lists.newArrayList();
6666

67-
public Elasticsearch6AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
67+
public Elasticsearch6AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
6868
super(new Elasticsearch6AsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6969
SqlNode conditionNode = joinInfo.getCondition();
7070
ParseUtils.parseJoinCompareOperate(conditionNode, sqlJoinCompareOperate);

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncSideInfo.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,33 @@
1818

1919
package com.dtstack.flink.sql.side.elasticsearch6;
2020

21-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.BaseSideInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.SideInfo;
26-
import com.dtstack.flink.sql.side.SideTableInfo;
2725
import com.dtstack.flink.sql.util.ParseUtils;
2826
import com.google.common.collect.Lists;
2927
import org.apache.calcite.sql.SqlBasicCall;
3028
import org.apache.calcite.sql.SqlIdentifier;
3129
import org.apache.calcite.sql.SqlKind;
3230
import org.apache.calcite.sql.SqlNode;
31+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3332

3433
import java.util.List;
3534

3635
/**
3736
* @author yinxi
3837
* @date 2020/2/13 - 13:09
3938
*/
40-
public class Elasticsearch6AsyncSideInfo extends SideInfo {
39+
public class Elasticsearch6AsyncSideInfo extends BaseSideInfo {
4140

4241

43-
public Elasticsearch6AsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
42+
public Elasticsearch6AsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4443
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4544
}
4645

4746
@Override
48-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
47+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
4948

5049
String sideTableName = joinInfo.getSideTableName();
5150
SqlNode conditionNode = joinInfo.getCondition();

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/table/Elasticsearch6SideParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package com.dtstack.flink.sql.side.elasticsearch6.table;
2020

2121
import com.dtstack.flink.sql.side.elasticsearch6.util.ClassUtil;
22-
import com.dtstack.flink.sql.table.AbsSideTableParser;
23-
import com.dtstack.flink.sql.table.TableInfo;
22+
import com.dtstack.flink.sql.table.AbstractSideTableParser;
23+
import com.dtstack.flink.sql.table.AbstractTableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525
import org.apache.commons.lang3.StringUtils;
2626

@@ -30,7 +30,7 @@
3030
* @author yinxi
3131
* @date 2020/1/13 - 1:07
3232
*/
33-
public class Elasticsearch6SideParser extends AbsSideTableParser {
33+
public class Elasticsearch6SideParser extends AbstractSideTableParser {
3434

3535
private static final String KEY_ES6_ADDRESS = "address";
3636

@@ -55,7 +55,7 @@ protected boolean fieldNameNeedsUpperCase() {
5555
}
5656

5757
@Override
58-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
58+
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
5959
Elasticsearch6SideTableInfo elasticsearch6SideTableInfo = new Elasticsearch6SideTableInfo();
6060
elasticsearch6SideTableInfo.setName(tableName);
6161
parseFieldsInfo(fieldsInfo, elasticsearch6SideTableInfo);

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/table/Elasticsearch6SideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818

1919
package com.dtstack.flink.sql.side.elasticsearch6.table;
2020

21-
import com.dtstack.flink.sql.side.SideTableInfo;
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2222
import com.google.common.base.Preconditions;
2323
import org.elasticsearch.search.builder.SearchSourceBuilder;
2424

2525
/**
2626
* @author yinxi
2727
* @date 2020/1/13 - 15:00
2828
*/
29-
public class Elasticsearch6SideTableInfo extends SideTableInfo {
29+
public class Elasticsearch6SideTableInfo extends AbstractSideTableInfo {
3030

3131
private static final String CURR_TYPE = "elasticsearch6";
3232

0 commit comments

Comments
 (0)