Skip to content

Commit 86120e9

Browse files
author
toutian
committed
Merge branch 'huaxia_bugfix' into 'v1.8.0_dev'
异步连接初始化时,使用rowcopy避免值被覆盖 See merge request !181
2 parents 43bdc18 + aab45ac commit 86120e9

File tree

6 files changed

+47
-40
lines changed

6 files changed

+47
-40
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,16 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
161161

162162
@Override
163163
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
164+
Row inputRow = Row.copy(input);
165165
JsonArray inputParams = new JsonArray();
166166
StringBuffer stringBuffer = new StringBuffer();
167167
String sqlWhere = " where ";
168168

169169
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = input.getField(conValIndex);
171+
Object equalObj = inputRow.getField(conValIndex);
172172
if (equalObj == null) {
173-
dealMissKey(input, resultFuture);
173+
dealMissKey(inputRow, resultFuture);
174174
return;
175175
}
176176
inputParams.add(equalObj);
@@ -194,12 +194,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194194
if (val != null) {
195195

196196
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(input, resultFuture);
197+
dealMissKey(inputRow, resultFuture);
198198
return;
199199
} else if (ECacheContentType.MultiLine == val.getType()) {
200200
List<Row> rowList = Lists.newArrayList();
201201
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(input, jsonArray);
202+
Row row = fillData(inputRow, jsonArray);
203203
rowList.add(row);
204204
}
205205
resultFuture.complete(rowList);
@@ -240,7 +240,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
240240
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241241
List<Row> rowList = Lists.newArrayList();
242242
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(input, line);
243+
Row row = fillData(inputRow, line);
244244
if (openCache()) {
245245
cacheContent.add(line);
246246
}
@@ -251,7 +251,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
251251
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252252
}
253253
} else {
254-
dealMissKey(input, resultFuture);
254+
dealMissKey(inputRow, resultFuture);
255255
if (openCache()) {
256256
putCache(key, CacheMissVal.getMissKeyObj());
257257
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,13 @@ public void open(Configuration parameters) throws Exception {
123123

124124
@Override
125125
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
126+
Row inputRow = Row.copy(input);
126127
Map<String, Object> refData = Maps.newHashMap();
127128
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
128129
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
129-
Object equalObj = input.getField(conValIndex);
130+
Object equalObj = inputRow.getField(conValIndex);
130131
if(equalObj == null){
131-
dealMissKey(input, resultFuture);
132+
dealMissKey(inputRow, resultFuture);
132133
return;
133134
}
134135

@@ -142,22 +143,22 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
142143
CacheObj val = getFromCache(rowKeyStr);
143144
if(val != null){
144145
if(ECacheContentType.MissVal == val.getType()){
145-
dealMissKey(input, resultFuture);
146+
dealMissKey(inputRow, resultFuture);
146147
return;
147148
}else if(ECacheContentType.SingleLine == val.getType()){
148-
Row row = fillData(input, val);
149+
Row row = fillData(inputRow, val);
149150
resultFuture.complete(Collections.singleton(row));
150151
}else if(ECacheContentType.MultiLine == val.getType()){
151152
for(Object one : (List)val.getContent()){
152-
Row row = fillData(input, one);
153+
Row row = fillData(inputRow, one);
153154
resultFuture.complete(Collections.singleton(row));
154155
}
155156
}
156157
return;
157158
}
158159
}
159160

160-
rowKeyMode.asyncGetData(tableName, rowKeyStr, input, resultFuture, sideInfo.getSideCache());
161+
rowKeyMode.asyncGetData(tableName, rowKeyStr, inputRow, resultFuture, sideInfo.getSideCache());
161162
}
162163

163164
@Override

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,17 @@ private void connKuDu() throws KuduException {
121121

122122
@Override
123123
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
124+
Row inputRow = Row.copy(input);
124125
//scannerBuilder 设置为null重新加载过滤条件
125126
scannerBuilder = null;
126127
connKuDu();
127128
JsonArray inputParams = new JsonArray();
128129
Schema schema = table.getSchema();
129130
// @wenbaoup fix bug
130131
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
131-
Object equalObj = input.getField(sideInfo.getEqualValIndex().get(i));
132+
Object equalObj = inputRow.getField(sideInfo.getEqualValIndex().get(i));
132133
if (equalObj == null) {
133-
dealMissKey(input, resultFuture);
134+
dealMissKey(inputRow, resultFuture);
134135
return;
135136
}
136137
//增加过滤条件
@@ -158,15 +159,15 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
158159
CacheObj val = getFromCache(key);
159160
if (val != null) {
160161
if (ECacheContentType.MissVal == val.getType()) {
161-
dealMissKey(input, resultFuture);
162+
dealMissKey(inputRow, resultFuture);
162163
return;
163164
} else if (ECacheContentType.SingleLine == val.getType()) {
164-
Row row = fillData(input, val);
165+
Row row = fillData(inputRow, val);
165166
resultFuture.complete(Collections.singleton(row));
166167
} else if (ECacheContentType.MultiLine == val.getType()) {
167168
List<Row> rowList = Lists.newArrayList();
168169
for (Object jsonArray : (List) val.getContent()) {
169-
Row row = fillData(input, jsonArray);
170+
Row row = fillData(inputRow, jsonArray);
170171
rowList.add(row);
171172
}
172173
resultFuture.complete(rowList);
@@ -181,7 +182,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
181182
List<Row> rowList = Lists.newArrayList();
182183
Deferred<RowResultIterator> data = asyncKuduScanner.nextRows();
183184
//从之前的同步修改为调用异步的Callback
184-
data.addCallbackDeferring(new GetListRowCB(input, cacheContent, rowList, asyncKuduScanner, resultFuture, key));
185+
data.addCallbackDeferring(new GetListRowCB(inputRow, cacheContent, rowList, asyncKuduScanner, resultFuture, key));
185186
}
186187

187188

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,13 @@ public void connMongoDB() throws Exception {
124124

125125
@Override
126126
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
127-
127+
Row inputRow = Row.copy(input);
128128
BasicDBObject basicDBObject = new BasicDBObject();
129129
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
130130
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
131-
Object equalObj = input.getField(conValIndex);
131+
Object equalObj = inputRow.getField(conValIndex);
132132
if (equalObj == null) {
133-
dealMissKey(input, resultFuture);
133+
dealMissKey(inputRow, resultFuture);
134134
return;
135135
}
136136
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);
@@ -154,12 +154,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
154154
if (val != null) {
155155

156156
if (ECacheContentType.MissVal == val.getType()) {
157-
dealMissKey(input, resultFuture);
157+
dealMissKey(inputRow, resultFuture);
158158
return;
159159
} else if (ECacheContentType.MultiLine == val.getType()) {
160160
List<Row> rowList = Lists.newArrayList();
161161
for (Object jsonArray : (List) val.getContent()) {
162-
Row row = fillData(input, jsonArray);
162+
Row row = fillData(inputRow, jsonArray);
163163
rowList.add(row);
164164
}
165165
resultFuture.complete(rowList);
@@ -176,7 +176,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
176176
@Override
177177
public void apply(final Document document) {
178178
atomicInteger.incrementAndGet();
179-
Row row = fillData(input, document);
179+
Row row = fillData(inputRow, document);
180180
if (openCache()) {
181181
cacheContent.add(document);
182182
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.side.*;
2424
import com.dtstack.flink.sql.side.cache.CacheObj;
2525
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
26+
import io.vertx.core.AsyncResult;
27+
import io.vertx.core.Handler;
2628
import io.vertx.core.Vertx;
2729
import io.vertx.core.VertxOptions;
2830
import io.vertx.core.json.JsonArray;
@@ -38,6 +40,8 @@
3840
import org.slf4j.LoggerFactory;
3941

4042
import java.sql.Timestamp;
43+
import java.util.Collection;
44+
import java.util.Collections;
4145
import java.util.List;
4246
import java.util.Map;
4347

@@ -76,12 +80,12 @@ public RdbAsyncReqRow(SideInfo sideInfo) {
7680

7781
@Override
7882
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
79-
83+
Row inputRow = Row.copy(input);
8084
JsonArray inputParams = new JsonArray();
8185
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
82-
Object equalObj = input.getField(conValIndex);
86+
Object equalObj = inputRow.getField(conValIndex);
8387
if (equalObj == null) {
84-
dealMissKey(input, resultFuture);
88+
dealMissKey(inputRow, resultFuture);
8589
return;
8690
}
8791
inputParams.add(equalObj);
@@ -93,12 +97,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
9397
if (val != null) {
9498

9599
if (ECacheContentType.MissVal == val.getType()) {
96-
dealMissKey(input, resultFuture);
100+
dealMissKey(inputRow, resultFuture);
97101
return;
98102
} else if (ECacheContentType.MultiLine == val.getType()) {
99103
List<Row> rowList = Lists.newArrayList();
100104
for (Object jsonArray : (List) val.getContent()) {
101-
Row row = fillData(input, jsonArray);
105+
Row row = fillData(inputRow, jsonArray);
102106
rowList.add(row);
103107
}
104108
resultFuture.complete(rowList);
@@ -132,7 +136,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
132136
List<Row> rowList = Lists.newArrayList();
133137

134138
for (JsonArray line : rs.result().getResults()) {
135-
Row row = fillData(input, line);
139+
Row row = fillData(inputRow, line);
136140
if (openCache()) {
137141
cacheContent.add(line);
138142
}
@@ -143,9 +147,9 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
143147
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
144148
}
145149

146-
resultFuture.complete(rowList);
150+
resultFuture.complete(Collections.unmodifiableCollection(rowList));
147151
} else {
148-
dealMissKey(input, resultFuture);
152+
dealMissKey(inputRow, resultFuture);
149153
if (openCache()) {
150154
putCache(key, CacheMissVal.getMissKeyObj());
151155
}

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,13 @@ public Row fillData(Row input, Object sideInput) {
117117

118118
@Override
119119
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
120+
Row inputRow = Row.copy(input);
120121
List<String> keyData = Lists.newLinkedList();
121122
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
122123
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
123-
Object equalObj = input.getField(conValIndex);
124+
Object equalObj = inputRow.getField(conValIndex);
124125
if(equalObj == null){
125-
dealMissKey(input, resultFuture);
126+
dealMissKey(inputRow, resultFuture);
126127
return;
127128
}
128129
String value = equalObj.toString();
@@ -136,10 +137,10 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
136137
CacheObj val = getFromCache(key);
137138
if(val != null){
138139
if(ECacheContentType.MissVal == val.getType()){
139-
dealMissKey(input, resultFuture);
140+
dealMissKey(inputRow, resultFuture);
140141
return;
141142
}else if(ECacheContentType.MultiLine == val.getType()){
142-
Row row = fillData(input, val.getContent());
143+
Row row = fillData(inputRow, val.getContent());
143144
resultFuture.complete(Collections.singleton(row));
144145
}else{
145146
RuntimeException exception = new RuntimeException("not support cache obj type " + val.getType());
@@ -153,7 +154,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
153154
List<String> value = async.keys(key + ":*").get();
154155
String[] values = value.toArray(new String[value.size()]);
155156
if (values.length == 0){
156-
dealMissKey(input, resultFuture);
157+
dealMissKey(inputRow, resultFuture);
157158
} else {
158159
RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values);
159160
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
@@ -165,13 +166,13 @@ public void accept(List<KeyValue<String, String>> keyValues) {
165166
keyValue.put(splitKeys[1], splitKeys[2]);
166167
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
167168
}
168-
Row row = fillData(input, keyValue);
169+
Row row = fillData(inputRow, keyValue);
169170
if (openCache()) {
170171
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue));
171172
}
172173
resultFuture.complete(Collections.singleton(row));
173174
} else {
174-
dealMissKey(input, resultFuture);
175+
dealMissKey(inputRow, resultFuture);
175176
if (openCache()) {
176177
putCache(key, CacheMissVal.getMissKeyObj());
177178
}

0 commit comments

Comments
 (0)