Skip to content

Commit f56801c

Browse files
author
dapeng
committed
code review
1 parent e51f1cf commit f56801c

File tree

188 files changed

+731
-640
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

188 files changed

+731
-640
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import com.datastax.driver.core.SocketOptions;
2929
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3030
import com.datastax.driver.core.policies.RetryPolicy;
31-
import com.dtstack.flink.sql.side.AllReqRow;
31+
import com.dtstack.flink.sql.side.BaseAllReqRow;
3232
import com.dtstack.flink.sql.side.FieldInfo;
3333
import com.dtstack.flink.sql.side.JoinInfo;
34-
import com.dtstack.flink.sql.side.SideTableInfo;
34+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3535
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
3636
import org.apache.calcite.sql.JoinType;
3737
import org.apache.commons.collections.CollectionUtils;
@@ -60,14 +60,12 @@
6060
*
6161
* @author xuqianjin
6262
*/
63-
public class CassandraAllReqRow extends AllReqRow {
63+
public class CassandraAllReqRow extends BaseAllReqRow {
6464

6565
private static final long serialVersionUID = 54015343561288219L;
6666

6767
private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);
6868

69-
private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";
70-
7169
private static final int CONN_RETRY_NUM = 3;
7270

7371
private static final int FETCH_SIZE = 1000;
@@ -77,7 +75,7 @@ public class CassandraAllReqRow extends AllReqRow {
7775

7876
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
7977

80-
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
78+
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8179
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8280
}
8381

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
23+
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlNode;
@@ -37,16 +37,16 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class CassandraAllSideInfo extends SideInfo {
40+
public class CassandraAllSideInfo extends BaseSideInfo {
4141

4242
private static final long serialVersionUID = -8690814317653033557L;
4343

44-
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
44+
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4545
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4646
}
4747

4848
@Override
49-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
49+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
5050
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5151

5252
sqlCondition = "select ${selectField} from ${tableName} ";

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3131
import com.datastax.driver.core.policies.RetryPolicy;
3232
import com.dtstack.flink.sql.enums.ECacheContentType;
33-
import com.dtstack.flink.sql.side.AsyncReqRow;
33+
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
3434
import com.dtstack.flink.sql.side.CacheMissVal;
3535
import com.dtstack.flink.sql.side.FieldInfo;
3636
import com.dtstack.flink.sql.side.JoinInfo;
37-
import com.dtstack.flink.sql.side.SideTableInfo;
37+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3838
import com.dtstack.flink.sql.side.cache.CacheObj;
3939
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4040
import com.google.common.base.Function;
@@ -56,7 +56,6 @@
5656
import java.net.InetAddress;
5757
import java.sql.Timestamp;
5858
import java.util.ArrayList;
59-
import java.util.Collections;
6059
import java.util.List;
6160
import java.util.Map;
6261

@@ -66,7 +65,7 @@
6665
*
6766
* @author xuqianjin
6867
*/
69-
public class CassandraAsyncReqRow extends AsyncReqRow {
68+
public class CassandraAsyncReqRow extends BaseAsyncReqRow {
7069

7170
private static final long serialVersionUID = 6631584128079864735L;
7271

@@ -82,7 +81,7 @@ public class CassandraAsyncReqRow extends AsyncReqRow {
8281
private transient ListenableFuture session;
8382
private transient CassandraSideTableInfo cassandraSideTableInfo;
8483

85-
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
84+
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8685
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8786
}
8887

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
23+
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlBasicCall;
@@ -39,16 +39,16 @@
3939
*
4040
* @author xuqianjin
4141
*/
42-
public class CassandraAsyncSideInfo extends SideInfo {
42+
public class CassandraAsyncSideInfo extends BaseSideInfo {
4343

4444
private static final long serialVersionUID = -4403313049809013362L;
4545

46-
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
46+
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4747
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4848
}
4949

5050
@Override
51-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
51+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
5252
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5353

5454
String sideTableName = joinInfo.getSideTableName();

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,24 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra.table;
2121

22-
import com.dtstack.flink.sql.table.AbsSideTableParser;
23-
import com.dtstack.flink.sql.table.TableInfo;
22+
import com.dtstack.flink.sql.table.AbstractSideTableParser;
23+
import com.dtstack.flink.sql.table.AbstractTableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

26-
import java.math.BigDecimal;
27-
import java.sql.Date;
2826
import java.sql.Timestamp;
2927
import java.util.Map;
3028
import java.util.regex.Matcher;
3129
import java.util.regex.Pattern;
3230

33-
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
31+
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;
3432

3533
/**
3634
* Reason:
3735
* Date: 2018/11/22
3836
*
3937
* @author xuqianjin
4038
*/
41-
public class CassandraSideParser extends AbsSideTableParser {
39+
public class CassandraSideParser extends AbstractSideTableParser {
4240

4341
private final static String SIDE_SIGN_KEY = "sideSignKey";
4442

@@ -73,7 +71,7 @@ public CassandraSideParser() {
7371
}
7472

7573
@Override
76-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
74+
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
7775
com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo cassandraSideTableInfo = new com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo();
7876
cassandraSideTableInfo.setName(tableName);
7977
parseFieldsInfo(fieldsInfo, cassandraSideTableInfo);
@@ -96,7 +94,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9694
return cassandraSideTableInfo;
9795
}
9896

99-
private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
97+
private void dealSideSign(Matcher matcher, AbstractTableInfo tableInfo) {
10098
}
10199

102100
@Override

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra.table;
2121

22-
import com.dtstack.flink.sql.side.SideTableInfo;
22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2323
import com.google.common.base.Preconditions;
2424

2525
/**
@@ -28,7 +28,7 @@
2828
*
2929
* @author xuqianjin
3030
*/
31-
public class CassandraSideTableInfo extends SideTableInfo {
31+
public class CassandraSideTableInfo extends AbstractSideTableInfo {
3232

3333
private static final long serialVersionUID = -5556431094535478915L;
3434

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
51+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
5252
import org.apache.flink.api.common.typeinfo.TypeInformation;
5353
import org.apache.flink.api.java.tuple.Tuple;
5454
import org.apache.flink.api.java.tuple.Tuple2;
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
72+
public class CassandraOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
25-
import com.dtstack.flink.sql.table.TargetTableInfo;
25+
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.tuple.Tuple2;
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -63,7 +63,7 @@ public CassandraSink() {
6363
}
6464

6565
@Override
66-
public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
66+
public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
6767
CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo;
6868
this.address = cassandraTableInfo.getAddress();
6969
this.tableName = cassandraTableInfo.getTableName();

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@
1919

2020
package com.dtstack.flink.sql.sink.cassandra.table;
2121

22-
import com.dtstack.flink.sql.table.AbsTableParser;
23-
import com.dtstack.flink.sql.table.TableInfo;
22+
import com.dtstack.flink.sql.table.AbstractTableParser;
23+
import com.dtstack.flink.sql.table.AbstractTableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

2626
import java.util.Map;
2727

28-
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
28+
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;
2929

3030
/**
3131
* Reason:
3232
* Date: 2018/11/22
3333
*
3434
* @author xuqianjin
3535
*/
36-
public class CassandraSinkParser extends AbsTableParser {
36+
public class CassandraSinkParser extends AbstractTableParser {
3737

3838
public static final String ADDRESS_KEY = "address";
3939

@@ -60,7 +60,7 @@ public class CassandraSinkParser extends AbsTableParser {
6060
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
6161

6262
@Override
63-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
63+
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
6464
CassandraTableInfo cassandraTableInfo = new CassandraTableInfo();
6565
cassandraTableInfo.setName(tableName);
6666
parseFieldsInfo(fieldsInfo, cassandraTableInfo);

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.sink.cassandra.table;
2121

22-
import com.dtstack.flink.sql.table.TargetTableInfo;
22+
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
2323
import com.google.common.base.Preconditions;
2424

2525
/**
@@ -28,7 +28,7 @@
2828
*
2929
* @author xuqianjin
3030
*/
31-
public class CassandraTableInfo extends TargetTableInfo {
31+
public class CassandraTableInfo extends AbstractTargetTableInfo {
3232

3333
private static final String CURR_TYPE = "cassandra";
3434

0 commit comments

Comments
 (0)