Skip to content

Commit aab45ac

Browse files
committed
async row copy
1 parent 0365a10 commit aab45ac

File tree

4 files changed

+4
-4
lines changed

4 files changed

+4
-4
lines changed

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
161161

162162
@Override
163163
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
Row inputRow = input;
164+
Row inputRow = Row.copy(input);
165165
JsonArray inputParams = new JsonArray();
166166
StringBuffer stringBuffer = new StringBuffer();
167167
String sqlWhere = " where ";

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private void connKuDu() throws KuduException {
121121

122122
@Override
123123
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
124-
Row inputRow = input;
124+
Row inputRow = Row.copy(input);
125125
//scannerBuilder 设置为null重新加载过滤条件
126126
scannerBuilder = null;
127127
connKuDu();

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void connMongoDB() throws Exception {
124124

125125
@Override
126126
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
127-
Row inputRow = input;
127+
Row inputRow = Row.copy(input);
128128
BasicDBObject basicDBObject = new BasicDBObject();
129129
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
130130
Integer conValIndex = sideInfo.getEqualValIndex().get(i);

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public Row fillData(Row input, Object sideInput) {
117117

118118
@Override
119119
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
120-
Row inputRow = input;
120+
Row inputRow = Row.copy(input);
121121
List<String> keyData = Lists.newLinkedList();
122122
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
123123
Integer conValIndex = sideInfo.getEqualValIndex().get(i);

0 commit comments

Comments
 (0)