Skip to content

Commit 14fa4a6

Browse files
Merge branch 'v1.8.0_dev' of ssh://git.dtstack.cn:10022/dt-insight-engine/flinkStreamSQL into feat_elasticsearch6-side
2 parents e0e4316 + 011fa58 commit 14fa4a6

File tree

78 files changed

+3783
-3009
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+3783
-3009
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends DtRichOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.clickhouse;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/15
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class ClickhouseDialect implements JDBCDialect {
31+
32+
@Override
33+
public boolean canHandle(String url) {
34+
return url.startsWith("jdbc:clickhouse:");
35+
}
36+
37+
@Override
38+
public Optional<String> defaultDriverName() {
39+
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
40+
}
41+
42+
@Override
43+
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
44+
throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode");
45+
}
46+
}

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,58 +21,39 @@
2121

2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2425
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2627

2728
import java.util.List;
2829
import java.util.Map;
2930

3031

3132
public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink> {
32-
33-
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
34-
3533
public ClickhouseSink() {
34+
super(new ClickhouseDialect());
3635
}
3736

3837
@Override
39-
public RetractJDBCOutputFormat getOutputFormat() {
40-
return new RetractJDBCOutputFormat();
41-
}
42-
43-
@Override
44-
public void buildSql(String scheam, String tableName, List<String> fields) {
45-
buildInsertSql(tableName, fields);
46-
}
47-
48-
@Override
49-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
50-
return null;
51-
}
52-
53-
private void buildInsertSql(String tableName, List<String> fields) {
54-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
55-
String fieldsStr = "";
56-
String placeholder = "";
57-
58-
for (String fieldName : fields) {
59-
fieldsStr += ",`" + fieldName + "`";
60-
placeholder += ",?";
61-
}
62-
63-
fieldsStr = fieldsStr.replaceFirst(",", "");
64-
placeholder = placeholder.replaceFirst(",", "");
65-
66-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
67-
this.sql = sqlTmp;
68-
System.out.println("---insert sql----");
69-
System.out.println(sql);
70-
}
71-
72-
73-
@Override
74-
public String getDriverName() {
75-
return CLICKHOUSE_DRIVER;
38+
public JDBCUpsertOutputFormat getOutputFormat() {
39+
JDBCOptions jdbcOptions = JDBCOptions.builder()
40+
.setDBUrl(dbURL)
41+
.setDialect(jdbcDialect)
42+
.setUsername(userName)
43+
.setPassword(password)
44+
.setTableName(tableName)
45+
.build();
46+
47+
return JDBCUpsertOutputFormat.builder()
48+
.setOptions(jdbcOptions)
49+
.setFieldNames(fieldNames)
50+
.setFlushMaxSize(batchNum)
51+
.setFlushIntervalMills(batchWaitInterval)
52+
.setFieldTypes(sqlTypes)
53+
.setKeyFields(primaryKeys)
54+
.setAllReplace(allReplace)
55+
.setUpdateMode(updateMode)
56+
.build();
7657
}
7758

7859

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends DtRichOutputFormat {
40+
public class ConsoleOutputFormat extends DtRichOutputFormat<Tuple2> {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@
116116
<version>${flink.version}</version>
117117
</dependency>
118118

119+
<dependency>
120+
<groupId>junit</groupId>
121+
<artifactId>junit</artifactId>
122+
<version>4.12</version>
123+
</dependency>
124+
119125
</dependencies>
120126

121127
<build>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,12 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
181181
//sql-dimensional table contains the dimension table of execution
182182
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
183183
}else{
184+
System.out.println("----------exec sql without dimension join-----------" );
185+
System.out.println("----------real sql exec is--------------------------");
186+
System.out.println(result.getExecSql());
184187
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
185188
if(LOG.isInfoEnabled()){
189+
System.out.println();
186190
LOG.info("exec sql: " + result.getExecSql());
187191
}
188192
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
* restract stream数据处理模式
23+
*
24+
* Reason:
25+
* Date: 2019/1/2
26+
* Company: www.dtstack.com
27+
* @author maqi
28+
*/
29+
public enum EUpdateMode {
30+
// 不回撤数据,只下发增量数据
31+
APPEND(0),
32+
// 先删除回撤数据,然后更新
33+
UPSERT(1);
34+
35+
private int type;
36+
37+
EUpdateMode(int type) {
38+
this.type = type;
39+
}
40+
41+
public int getType() {
42+
return this.type;
43+
}
44+
}

core/src/main/java/com/dtstack/flink/sql/outputformat/DtRichOutputFormat.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package com.dtstack.flink.sql.outputformat;
1919

2020
import com.dtstack.flink.sql.metric.MetricConstant;
21-
import org.apache.flink.api.java.tuple.Tuple2;
22-
2321
import org.apache.flink.api.common.io.RichOutputFormat;
2422
import org.apache.flink.metrics.Counter;
2523
import org.apache.flink.metrics.Meter;
@@ -29,16 +27,13 @@
2927
* extend RichOutputFormat with metric 'dtNumRecordsOut', 'dtNumDirtyRecordsOut', 'dtNumRecordsOutRate'
3028
* Created by sishu.yss on 2018/11/28.
3129
*/
32-
public abstract class DtRichOutputFormat extends RichOutputFormat<Tuple2>{
33-
34-
protected transient Counter outRecords;
30+
public abstract class DtRichOutputFormat<T> extends RichOutputFormat<T>{
3531

36-
protected transient Counter outDirtyRecords;
37-
38-
protected transient Meter outRecordsRate;
32+
public transient Counter outRecords;
33+
public transient Counter outDirtyRecords;
34+
public transient Meter outRecordsRate;
3935

4036
protected static int ROW_PRINT_FREQUENCY = 1000;
41-
4237
protected static int DIRTY_PRINT_FREQUENCY = 1000;
4338

4439
public void initMetric() {

core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.google.common.collect.HashBasedTable;
24+
import org.apache.commons.lang3.StringUtils;
2425

2526
/**
26-
* Reason:
27+
* 用于记录转换之后的表和原来表直接字段的关联关系
2728
* Date: 2018/8/30
2829
* Company: www.dtstack.com
2930
* @author xuchao
@@ -37,6 +38,8 @@ public class FieldReplaceInfo {
3738

3839
private String targetTableAlias = null;
3940

41+
private FieldReplaceInfo preNode = null;
42+
4043
public void setMappingTable(HashBasedTable<String, String, String> mappingTable) {
4144
this.mappingTable = mappingTable;
4245
}
@@ -57,7 +60,39 @@ public String getTargetTableAlias() {
5760
return targetTableAlias;
5861
}
5962

63+
public FieldReplaceInfo getPreNode() {
64+
return preNode;
65+
}
66+
67+
public void setPreNode(FieldReplaceInfo preNode) {
68+
this.preNode = preNode;
69+
}
70+
6071
public void setTargetTableAlias(String targetTableAlias) {
6172
this.targetTableAlias = targetTableAlias;
6273
}
74+
75+
/**
76+
* 根据原始的tableName + fieldName 获取转换之后的fieldName
77+
* @param tableName
78+
* @param fieldName
79+
* @return
80+
*/
81+
public String getTargetFieldName(String tableName, String fieldName){
82+
String targetFieldName = mappingTable.get(tableName, fieldName);
83+
if(StringUtils.isNotBlank(targetFieldName)){
84+
return targetFieldName;
85+
}
86+
87+
if(preNode == null){
88+
return null;
89+
}
90+
91+
String preNodeTargetFieldName = preNode.getTargetFieldName(tableName, fieldName);
92+
if(StringUtils.isBlank(preNodeTargetFieldName)){
93+
return null;
94+
}
95+
96+
return mappingTable.get(preNode.getTargetTableAlias(), preNodeTargetFieldName);
97+
}
6398
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public class JoinInfo implements Serializable {
4141

4242
//左表是否是维表
4343
private boolean leftIsSideTable;
44-
//左表是 转换后的中间表
45-
private boolean leftIsMidTable;
44+
45+
private boolean leftIsTmpTable = false;
4646

4747
//右表是否是维表
4848
private boolean rightIsSideTable;
@@ -66,8 +66,6 @@ public class JoinInfo implements Serializable {
6666
private SqlNode selectNode;
6767

6868
private JoinType joinType;
69-
// 左边是中间转换表,做表映射关系,给替换属性名称使用
70-
private Map<String, String> leftTabMapping;
7169

7270
public String getSideTableName(){
7371
if(leftIsSideTable){
@@ -92,21 +90,6 @@ public String getNewTableName(){
9290
return leftStr + "_" + rightTableName;
9391
}
9492

95-
public boolean isLeftIsMidTable() {
96-
return leftIsMidTable;
97-
}
98-
99-
public void setLeftIsMidTable(boolean leftIsMidTable) {
100-
this.leftIsMidTable = leftIsMidTable;
101-
}
102-
103-
public Map<String, String> getLeftTabMapping() {
104-
return leftTabMapping;
105-
}
106-
107-
public void setLeftTabMapping(Map<String, String> leftTabMapping) {
108-
this.leftTabMapping = leftTabMapping;
109-
}
11093

11194
public String getNewTableAlias(){
11295
return leftTableAlias + "_" + rightTableAlias;
@@ -211,4 +194,12 @@ public JoinType getJoinType() {
211194
public void setJoinType(JoinType joinType) {
212195
this.joinType = joinType;
213196
}
197+
198+
public boolean isLeftIsTmpTable() {
199+
return leftIsTmpTable;
200+
}
201+
202+
public void setLeftIsTmpTable(boolean leftIsTmpTable) {
203+
this.leftIsTmpTable = leftIsTmpTable;
204+
}
214205
}

0 commit comments

Comments
 (0)