Skip to content

Commit d80f371

Browse files
author
dapeng
committed
Merge branch 'feat_1.8_asyncException_mergeTest' into 1.8_release_3.10.x_mergedTest_new
2 parents 425bdc2 + eb1d76d commit d80f371

File tree

23 files changed

+787
-452
lines changed

23 files changed

+787
-452
lines changed

.gitlab-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ build:
22
stage: test
33
script:
44
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
5-
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
5+
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
66
- sh ci/sonar_notify.sh
77
only:
88
- v1.8.0_dev

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 23 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -162,60 +162,13 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
162162
}
163163

164164
@Override
165-
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
166-
CRow inputCopy = new CRow(input.row(), input.change());
167-
JsonArray inputParams = new JsonArray();
168-
StringBuffer stringBuffer = new StringBuffer();
169-
String sqlWhere = " where ";
170-
171-
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
172-
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
173-
Object equalObj = inputCopy.row().getField(conValIndex);
174-
if (equalObj == null) {
175-
dealMissKey(inputCopy, resultFuture);
176-
return;
177-
}
178-
inputParams.add(equalObj);
179-
StringBuffer sqlTemp = stringBuffer.append(sideInfo.getEqualFieldList().get(i))
180-
.append(" = ");
181-
if (equalObj instanceof String) {
182-
sqlTemp.append("'" + equalObj + "'")
183-
.append(" and ");
184-
} else {
185-
sqlTemp.append(equalObj)
186-
.append(" and ");
187-
}
188-
189-
}
165+
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
190166

191167
String key = buildCacheKey(inputParams);
192-
sqlWhere = sqlWhere + stringBuffer.toString().substring(0, stringBuffer.lastIndexOf(" and "));
193-
194-
if (openCache()) {
195-
CacheObj val = getFromCache(key);
196-
if (val != null) {
197-
198-
if (ECacheContentType.MissVal == val.getType()) {
199-
dealMissKey(inputCopy, resultFuture);
200-
return;
201-
} else if (ECacheContentType.MultiLine == val.getType()) {
202-
List<CRow> rowList = Lists.newArrayList();
203-
for (Object jsonArray : (List) val.getContent()) {
204-
Row row = fillData(inputCopy.row(), jsonArray);
205-
rowList.add(new CRow(row, inputCopy.change()));
206-
}
207-
resultFuture.complete(rowList);
208-
} else {
209-
throw new RuntimeException("not support cache obj type " + val.getType());
210-
}
211-
return;
212-
}
213-
}
214-
215168
//connect Cassandra
216169
connCassandraDB(cassandraSideTableInfo);
217170

218-
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
171+
String sqlCondition = sideInfo.getSqlCondition() + " " + buildWhereCondition(inputParams) + " ALLOW FILTERING ";
219172
LOG.info("sqlCondition:{}" + sqlCondition);
220173

221174
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
@@ -242,18 +195,18 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
242195
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
243196
List<CRow> rowList = Lists.newArrayList();
244197
for (com.datastax.driver.core.Row line : rows) {
245-
Row row = fillData(inputCopy.row(), line);
198+
Row row = fillData(input.row(), line);
246199
if (openCache()) {
247200
cacheContent.add(line);
248201
}
249-
rowList.add(new CRow(row,inputCopy.change()));
202+
rowList.add(new CRow(row, input.change()));
250203
}
251204
resultFuture.complete(rowList);
252205
if (openCache()) {
253206
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
254207
}
255208
} else {
256-
dealMissKey(inputCopy, resultFuture);
209+
dealMissKey(input, resultFuture);
257210
if (openCache()) {
258211
putCache(key, CacheMissVal.getMissKeyObj());
259212
}
@@ -271,6 +224,24 @@ public void onFailure(Throwable t) {
271224
});
272225
}
273226

227+
@Override
228+
public String buildCacheKey(Map<String, Object> inputParams) {
229+
StringBuilder sb = new StringBuilder();
230+
for (Object ele : inputParams.values()) {
231+
sb.append(ele.toString()).append("_");
232+
}
233+
return sb.toString();
234+
}
235+
236+
private String buildWhereCondition(Map<String, Object> inputParams){
237+
StringBuilder sb = new StringBuilder(" where ");
238+
for(Map.Entry<String, Object> entry : inputParams.entrySet()){
239+
Object value = entry.getValue() instanceof String ? "'" + entry.getValue() + "'" : entry.getValue();
240+
sb.append(String.format("%s = %s", entry.getKey(), value));
241+
}
242+
return sb.toString();
243+
}
244+
274245
@Override
275246
public Row fillData(Row input, Object line) {
276247
com.datastax.driver.core.Row rowArray = (com.datastax.driver.core.Row) line;
@@ -305,14 +276,4 @@ public void close() throws Exception {
305276
cluster = null;
306277
}
307278
}
308-
309-
public String buildCacheKey(JsonArray jsonArray) {
310-
StringBuilder sb = new StringBuilder();
311-
for (Object ele : jsonArray.getList()) {
312-
sb.append(ele.toString())
313-
.append("_");
314-
}
315-
316-
return sb.toString();
317-
}
318279
}

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.io.Serializable;
2929
import java.util.List;
30+
import java.util.Objects;
3031

3132
/**
3233
* Reason:
@@ -53,15 +54,15 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
5354

5455
public static final String ASYNC_TIMEOUT_KEY = "asyncTimeout";
5556

56-
public static final String ASYNC_TIMEOUT_NUM_KEY = "asyncTimeoutNum";
57-
57+
public static final String ASYNC_FAIL_MAX_NUM_KEY = "asyncFailMaxNum";
58+
5859
public static final String ASYNC_REQ_POOL_KEY = "asyncPoolSize";
5960

6061
private String cacheType = "none";
6162

6263
private int cacheSize = 10000;
6364

64-
private long cacheTimeout = 60_000L;
65+
private long cacheTimeout = 60 * 1000L;
6566

6667
private int asyncCapacity=100;
6768

@@ -72,12 +73,12 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
7273
*/
7374
private int asyncPoolSize = 0;
7475

75-
private int asyncTimeoutNumLimit = Integer.MAX_VALUE;
76-
7776
private boolean partitionedJoin = false;
7877

7978
private String cacheMode="ordered";
8079

80+
private Long asyncFailMaxNum;
81+
8182
private List<PredicateInfo> predicateInfoes = Lists.newArrayList();
8283

8384
public RowTypeInfo getRowTypeInfo(){
@@ -155,12 +156,12 @@ public List<PredicateInfo> getPredicateInfoes() {
155156
return predicateInfoes;
156157
}
157158

158-
public int getAsyncTimeoutNumLimit() {
159-
return asyncTimeoutNumLimit;
159+
public Long getAsyncFailMaxNum(Long defaultValue) {
160+
return Objects.isNull(asyncFailMaxNum) ? defaultValue : asyncFailMaxNum;
160161
}
161162

162-
public void setAsyncTimeoutNumLimit(int asyncTimeoutNumLimit) {
163-
this.asyncTimeoutNumLimit = asyncTimeoutNumLimit;
163+
public void setAsyncFailMaxNum(Long asyncFailMaxNum) {
164+
this.asyncFailMaxNum = asyncFailMaxNum;
164165
}
165166

166167
public int getAsyncPoolSize() {
@@ -180,7 +181,7 @@ public String toString() {
180181
", asyncCapacity=" + asyncCapacity +
181182
", asyncTimeout=" + asyncTimeout +
182183
", asyncPoolSize=" + asyncPoolSize +
183-
", asyncTimeoutNumLimit=" + asyncTimeoutNumLimit +
184+
", asyncFailMaxNum=" + asyncFailMaxNum +
184185
", partitionedJoin=" + partitionedJoin +
185186
", cacheMode='" + cacheMode + '\'' +
186187
'}';

0 commit comments

Comments
 (0)