Skip to content

Commit db77f9a

Browse files
committed
优化es6维表相关
1 parent 3e1b07a commit db77f9a

File tree

3 files changed

+33
-25
lines changed

3 files changed

+33
-25
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
24+
import org.apache.calcite.sql.JoinType;
2425
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.table.runtime.types.CRow;
28+
import org.apache.flink.types.Row;
29+
import org.apache.flink.util.Collector;
2730

2831
import java.sql.SQLException;
2932
import java.util.concurrent.Executors;
@@ -64,4 +67,13 @@ public void open(Configuration parameters) throws Exception {
6467
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6568
}
6669

70+
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out){
71+
if(sideInput == null && sideInfo.getJoinType() != JoinType.LEFT){
72+
return;
73+
}
74+
75+
Row row = fillData(value.row(), sideInput);
76+
out.collect(new CRow(row, value.change()));
77+
}
78+
6779
}

docs/elasticsearch6Side

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ create table sideTable(
7070
PRIMARY KEY(channel),
7171
PERIOD FOR SYSTEM_TIME
7272
)WITH(
73-
type ='elasticsearch',
73+
type ='elasticsearch6',
7474
address ='172.16.10.47:9500',
7575
cluster='es_47_menghan',
7676
estype ='type1',

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,7 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
8080
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
8181
Object equalObj = value.row().getField(conValIndex);
8282
if (equalObj == null) {
83-
if (sideInfo.getJoinType() == JoinType.LEFT) {
84-
Row row = fillData(value.row(), null);
85-
out.collect(new CRow(row, value.change()));
86-
}
87-
83+
sendOutputRow(value, null, out);
8884
return;
8985
}
9086

@@ -94,18 +90,12 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
9490
String key = buildKey(inputParams);
9591
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
9692
if (CollectionUtils.isEmpty(cacheList)) {
97-
if (sideInfo.getJoinType() == JoinType.LEFT) {
98-
Row row = fillData(value.row(), null);
99-
out.collect(new CRow(row, value.change()));
100-
} else {
101-
return;
102-
}
103-
93+
sendOutputRow(value, null, out);
10494
return;
10595
}
10696

10797
for (Map<String, Object> one : cacheList) {
108-
out.collect(new CRow(fillData(value.row(), one), value.change()));
98+
sendOutputRow(value, one, out);
10999
}
110100
}
111101

@@ -155,7 +145,7 @@ private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
155145
}
156146

157147
@Override
158-
protected void initCache() throws SQLException {
148+
protected void initCache() {
159149
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
160150
cacheRef.set(newCache);
161151
try {
@@ -165,6 +155,7 @@ protected void initCache() throws SQLException {
165155
loadData(newCache);
166156
} catch (Exception e) {
167157
LOG.error("", e);
158+
throw new RuntimeException(e);
168159
}
169160
}
170161

@@ -176,6 +167,7 @@ protected void reloadCache() {
176167
loadData(newCache);
177168
} catch (Exception e) {
178169
LOG.error("", e);
170+
throw new RuntimeException(e);
179171
}
180172

181173
cacheRef.set(newCache);
@@ -210,6 +202,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
210202

211203
} catch (Exception e) {
212204
LOG.error("", e);
205+
throw new RuntimeException(e);
213206
} finally {
214207

215208
if (rhlClient != null) {
@@ -220,18 +213,18 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws IO
220213

221214
// initialize searchSourceBuilder
222215
private SearchSourceBuilder initConfiguration(BoolQueryBuilder boolQueryBuilder){
223-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
224-
if (boolQueryBuilder != null) {
225-
searchSourceBuilder.query(boolQueryBuilder);
226-
}
216+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
217+
if (boolQueryBuilder != null) {
218+
searchSourceBuilder.query(boolQueryBuilder);
219+
}
227220

228-
searchSourceBuilder.size(getFetchSize());
229-
searchSourceBuilder.sort("_id", SortOrder.DESC);
221+
searchSourceBuilder.size(getFetchSize());
222+
searchSourceBuilder.sort("_id", SortOrder.DESC);
230223

231-
// fields included in the source data
232-
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
233-
searchSourceBuilder.fetchSource(sideFieldNames, null);
234-
return searchSourceBuilder;
224+
// fields included in the source data
225+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
226+
searchSourceBuilder.fetchSource(sideFieldNames, null);
227+
return searchSourceBuilder;
235228
}
236229

237230

@@ -247,6 +240,7 @@ private void searchData(SearchSourceBuilder searchSourceBuilder, Map<String, Lis
247240
// set search mark
248241
searchSourceBuilder.searchAfter(searchAfterParameter);
249242
}
243+
250244
searchRequest.source(searchSourceBuilder);
251245
searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
252246
searchHits = searchResponse.getHits().getHits();
@@ -267,6 +261,7 @@ private void searchData(SearchSourceBuilder searchSourceBuilder, Map<String, Lis
267261
private void loadToCache(SearchHit[] searchHits, Map<String, List<Map<String, Object>>> tmpCache) {
268262
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
269263
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();
264+
270265
for (SearchHit searchHit : searchHits) {
271266
Map<String, Object> oneRow = Maps.newHashMap();
272267
for (String fieldName : sideFieldNames) {
@@ -275,6 +270,7 @@ private void loadToCache(SearchHit[] searchHits, Map<String, List<Map<String, Ob
275270
object = SwitchUtil.getTarget(object, sideFieldTypes[fieldIndex]);
276271
oneRow.put(fieldName.trim(), object);
277272
}
273+
278274
String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList());
279275
List<Map<String, Object>> list = tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList());
280276
list.add(oneRow);

0 commit comments

Comments
 (0)