Skip to content

Commit 41f7201

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into 1.8.0_dev_optimizeFormatWithKafka
# Conflicts: # kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java
2 parents 44af091 + 51f9d55 commit 41f7201

File tree

18 files changed

+254
-149
lines changed

18 files changed

+254
-149
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,16 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
161161

162162
@Override
163163
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
164+
Row inputRow = Row.copy(input);
165165
JsonArray inputParams = new JsonArray();
166166
StringBuffer stringBuffer = new StringBuffer();
167167
String sqlWhere = " where ";
168168

169169
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = input.getField(conValIndex);
171+
Object equalObj = inputRow.getField(conValIndex);
172172
if (equalObj == null) {
173-
dealMissKey(input, resultFuture);
173+
dealMissKey(inputRow, resultFuture);
174174
return;
175175
}
176176
inputParams.add(equalObj);
@@ -194,12 +194,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194194
if (val != null) {
195195

196196
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(input, resultFuture);
197+
dealMissKey(inputRow, resultFuture);
198198
return;
199199
} else if (ECacheContentType.MultiLine == val.getType()) {
200200
List<Row> rowList = Lists.newArrayList();
201201
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(input, jsonArray);
202+
Row row = fillData(inputRow, jsonArray);
203203
rowList.add(row);
204204
}
205205
resultFuture.complete(rowList);
@@ -240,7 +240,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
240240
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241241
List<Row> rowList = Lists.newArrayList();
242242
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(input, line);
243+
Row row = fillData(inputRow, line);
244244
if (openCache()) {
245245
cacheContent.add(line);
246246
}
@@ -251,7 +251,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
251251
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252252
}
253253
} else {
254-
dealMissKey(input, resultFuture);
254+
dealMissKey(inputRow, resultFuture);
255255
if (openCache()) {
256256
putCache(key, CacheMissVal.getMissKeyObj());
257257
}

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
6969

7070
List<String[]> data = new ArrayList<>();
7171
data.add(fieldNames);
72-
data.add(record.toString().split(","));
72+
String[] recordStr = new String[record.getArity()];
73+
for (int i=0; i < record.getArity(); i++) {
74+
recordStr[i] = (String.valueOf(record.getField(i)));
75+
}
76+
data.add(recordStr);
7377
TablePrintUtil.build(data).print();
7478

7579
outRecords.inc();

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.table.plan.logical.LogicalRelNode;
2828
import org.apache.flink.table.plan.schema.TableSinkTable;
2929
import org.apache.flink.table.plan.schema.TableSourceSinkTable;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032
import scala.Option;
3133

3234
import java.lang.reflect.Method;
@@ -38,6 +40,8 @@
3840
*/
3941
public class FlinkSQLExec {
4042

43+
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
44+
4145
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, StreamQueryConfig queryConfig) throws Exception {
4246

4347
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
@@ -64,18 +68,47 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, Strea
6468

6569
TableSourceSinkTable targetTable = (TableSourceSinkTable) sinkTab.get();
6670
TableSinkTable tableSinkTable = (TableSinkTable)targetTable.tableSinkTable().get();
67-
String[] fieldNames = tableSinkTable.tableSink().getFieldNames();
71+
72+
StreamQueryConfig config = null == queryConfig ? tableEnv.queryConfig() : queryConfig;
73+
String[] sinkFieldNames = tableSinkTable.tableSink().getFieldNames();
74+
String[] queryFieldNames = queryResult.getSchema().getColumnNames();
75+
if (sinkFieldNames.length != queryFieldNames.length) {
76+
throw new ValidationException(
77+
"Field name of query result and registered TableSink " + targetTableName + " do not match.\n" +
78+
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
79+
"TableSink schema: " + String.join(",", sinkFieldNames));
80+
}
6881

6982
Table newTable = null;
7083
try {
71-
newTable = queryResult.select(String.join(",", fieldNames));
84+
// sinkFieldNames not in queryResult error
85+
newTable = queryResult.select(String.join(",", sinkFieldNames));
7286
} catch (Exception e) {
7387
throw new ValidationException(
74-
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
75-
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
76-
"TableSink schema: " + String.join(",", fieldNames));
88+
"Field name of query result and registered TableSink " + targetTableName + " do not match.\n" +
89+
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
90+
"TableSink schema: " + String.join(",", sinkFieldNames));
7791
}
78-
StreamQueryConfig config = null == queryConfig ? tableEnv.queryConfig() : queryConfig;
79-
tableEnv.insertInto(newTable, targetTableName, config);
92+
93+
try {
94+
tableEnv.insertInto(newTable, targetTableName, config);
95+
} catch (Exception ex) {
96+
LOG.warn("Field name case of query result and registered TableSink " + targetTableName + "do not match. " + ex.getMessage());
97+
newTable = queryResult.select(String.join(",", ignoreCase(queryFieldNames, sinkFieldNames)));
98+
tableEnv.insertInto(newTable, targetTableName, config);
99+
}
100+
}
101+
102+
public static String[] ignoreCase(String[] queryFieldNames, String[] sinkFieldNames) {
103+
String[] newFieldNames = sinkFieldNames;
104+
for (int i = 0; i < newFieldNames.length; i++) {
105+
for (String queryFieldName : queryFieldNames) {
106+
if (newFieldNames[i].equalsIgnoreCase(queryFieldName)) {
107+
newFieldNames[i] = queryFieldName;
108+
break;
109+
}
110+
}
111+
}
112+
return newFieldNames;
80113
}
81114
}

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2929
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3030
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
31+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
3132
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3233
import org.apache.flink.types.Row;
3334

@@ -119,9 +120,8 @@ private void parseTree(JsonNode jsonNode, String prefix){
119120
JsonNode child = jsonNode.get(next);
120121
String nodeKey = getNodeKey(prefix, next);
121122

122-
if (child.isValueNode()){
123-
nodeAndJsonNodeMapping.put(nodeKey, child);
124-
}else if(child.isArray()){
123+
nodeAndJsonNodeMapping.put(nodeKey, child);
124+
if(child.isArray()){
125125
parseTree(child, nodeKey);
126126
}else {
127127
parseTree(child, nodeKey);
@@ -152,7 +152,13 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
152152
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
153153
return node.asBoolean();
154154
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
155-
return node.asText();
155+
if (node instanceof ObjectNode) {
156+
return node.toString();
157+
} else if (node instanceof NullNode) {
158+
return null;
159+
} else {
160+
return node.asText();
161+
}
156162
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
157163
return Date.valueOf(node.asText());
158164
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class MetricConstant {
4747

4848
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
4949

50+
public static final String DT_NUM_SIDE_PARSE_ERROR_RECORDS = "dtNumSideParseErrorRecords";
51+
5052
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
5153

5254
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.enums.ECacheType;
24+
import com.dtstack.flink.sql.metric.MetricConstant;
2425
import com.dtstack.flink.sql.side.cache.AbsSideCache;
2526
import com.dtstack.flink.sql.side.cache.CacheObj;
2627
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2728
import org.apache.calcite.sql.JoinType;
2829
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.metrics.Counter;
2931
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3032
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3133
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3234
import org.apache.flink.types.Row;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3337

3438
import java.util.Collection;
3539
import java.util.Collections;
@@ -44,25 +48,21 @@
4448
*/
4549

4650
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
47-
51+
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
4852
private static final long serialVersionUID = 2098635244857937717L;
4953

5054
protected SideInfo sideInfo;
55+
protected transient Counter parseErrorRecords;
5156

5257
public AsyncReqRow(SideInfo sideInfo){
5358
this.sideInfo = sideInfo;
5459
}
5560

5661
@Override
57-
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
58-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
59-
try {
60-
if (null == future.get()) {
61-
new TimeoutException("Async function call has timed out.");
62-
}
63-
} catch (Exception e) {
64-
throw new Exception(e);
65-
}
62+
public void open(Configuration parameters) throws Exception {
63+
super.open(parameters);
64+
initCache();
65+
initMetric();
6666
}
6767

6868
private void initCache(){
@@ -78,10 +78,13 @@ private void initCache(){
7878
}else{
7979
throw new RuntimeException("not support side cache with type:" + sideTableInfo.getCacheType());
8080
}
81-
8281
sideCache.initCache();
8382
}
8483

84+
private void initMetric() {
85+
parseErrorRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_SIDE_PARSE_ERROR_RECORDS);
86+
}
87+
8588
protected CacheObj getFromCache(String key){
8689
return sideInfo.getSideCache().getFromCache(key);
8790
}
@@ -97,17 +100,41 @@ protected boolean openCache(){
97100
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
98101
if(sideInfo.getJoinType() == JoinType.LEFT){
99102
//Reserved left table data
100-
Row row = fillData(input, null);
101-
resultFuture.complete(Collections.singleton(row));
103+
try {
104+
Row row = fillData(input, null);
105+
resultFuture.complete(Collections.singleton(row));
106+
} catch (Exception e) {
107+
dealFillDataError(resultFuture, e, input);
108+
}
102109
}else{
103110
resultFuture.complete(null);
104111
}
105112
}
106113

114+
protected void dealCacheData(String key, CacheObj missKeyObj) {
115+
if (openCache()) {
116+
putCache(key, missKeyObj);
117+
}
118+
}
119+
107120
@Override
108-
public void open(Configuration parameters) throws Exception {
109-
super.open(parameters);
110-
initCache();
121+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
123+
try {
124+
if (null == future.get()) {
125+
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
126+
}
127+
} catch (Exception e) {
128+
resultFuture.completeExceptionally(new Exception(e));
129+
}
130+
}
131+
132+
133+
protected void dealFillDataError(ResultFuture<Row> resultFuture, Exception e, Object sourceData) {
134+
LOG.debug("source data {} join side table error ", sourceData);
135+
LOG.debug("async buid row error..{}", e);
136+
parseErrorRecords.inc();
137+
resultFuture.complete(Collections.emptyList());
111138
}
112139

113140
@Override

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,15 @@ public void open(Configuration parameters) throws Exception {
123123

124124
@Override
125125
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
126+
Row inputRow = Row.copy(input);
126127
Map<String, Object> refData = Maps.newHashMap();
127128
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
128129
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
129-
Object equalObj = input.getField(conValIndex);
130+
Object equalObj = inputRow.getField(conValIndex);
130131
if(equalObj == null){
131-
dealMissKey(input, resultFuture);
132+
dealMissKey(inputRow, resultFuture);
132133
return;
133134
}
134-
135135
refData.put(sideInfo.getEqualFieldList().get(i), equalObj);
136136
}
137137

@@ -142,22 +142,30 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
142142
CacheObj val = getFromCache(rowKeyStr);
143143
if(val != null){
144144
if(ECacheContentType.MissVal == val.getType()){
145-
dealMissKey(input, resultFuture);
145+
dealMissKey(inputRow, resultFuture);
146146
return;
147147
}else if(ECacheContentType.SingleLine == val.getType()){
148-
Row row = fillData(input, val);
149-
resultFuture.complete(Collections.singleton(row));
150-
}else if(ECacheContentType.MultiLine == val.getType()){
151-
for(Object one : (List)val.getContent()){
152-
Row row = fillData(input, one);
148+
try {
149+
Row row = fillData(inputRow, val);
153150
resultFuture.complete(Collections.singleton(row));
151+
} catch (Exception e) {
152+
dealFillDataError(resultFuture, e, inputRow);
153+
}
154+
}else if(ECacheContentType.MultiLine == val.getType()){
155+
try {
156+
for(Object one : (List)val.getContent()){
157+
Row row = fillData(inputRow, one);
158+
resultFuture.complete(Collections.singleton(row));
159+
}
160+
} catch (Exception e) {
161+
dealFillDataError(resultFuture, e, inputRow);
154162
}
155163
}
156164
return;
157165
}
158166
}
159167

160-
rowKeyMode.asyncGetData(tableName, rowKeyStr, input, resultFuture, sideInfo.getSideCache());
168+
rowKeyMode.asyncGetData(tableName, rowKeyStr, inputRow, resultFuture, sideInfo.getSideCache());
161169
}
162170

163171
@Override

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbsRowKeyModeDealer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,13 @@ public AbsRowKeyModeDealer(Map<String, String> colRefType, String[] colNames, HB
7474

7575
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
7676
if(joinType == JoinType.LEFT){
77-
//保留left 表数据
78-
Row row = fillData(input, null);
79-
resultFuture.complete(Collections.singleton(row));
77+
try {
78+
//保留left 表数据
79+
Row row = fillData(input, null);
80+
resultFuture.complete(Collections.singleton(row));
81+
} catch (Exception e) {
82+
resultFuture.completeExceptionally(e);
83+
}
8084
}else{
8185
resultFuture.complete(null);
8286
}

0 commit comments

Comments
 (0)