Skip to content

Commit b861964

Browse files
committed
Merge branch 'feat_1.8_allDataInit' into '1.8_test_3.10.x'
rdb all code opt See merge request !248
2 parents 75aceac + 9a57115 commit b861964

File tree

2 files changed

+103
-94
lines changed

2 files changed

+103
-94
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,19 @@ public void open(Configuration parameters) throws Exception {
6868
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6969
}
7070

71-
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out){
72-
if(sideInput == null && sideInfo.getJoinType() != JoinType.LEFT){
71+
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out) {
72+
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
7373
return;
7474
}
7575

7676
Row row = fillData(value.row(), sideInput);
7777
out.collect(new CRow(row, value.change()));
7878
}
7979

80+
@Override
81+
public void close() throws Exception {
82+
if (null != es && !es.isShutdown()) {
83+
es.shutdown();
84+
}
85+
}
8086
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 95 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side.rdb.all;
2020

21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2122
import org.apache.flink.table.runtime.types.CRow;
2223
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2324
import org.apache.flink.types.Row;
@@ -40,10 +41,12 @@
4041
import java.sql.SQLException;
4142
import java.sql.Statement;
4243
import java.sql.Timestamp;
44+
import java.util.ArrayList;
4345
import java.util.Calendar;
4446
import java.util.List;
4547
import java.util.Map;
4648
import java.util.concurrent.atomic.AtomicReference;
49+
import java.util.stream.Collectors;
4750

4851
/**
4952
* side operator with cache for all(period reload)
@@ -61,39 +64,14 @@ public abstract class RdbAllReqRow extends AllReqRow {
6164

6265
private static final int CONN_RETRY_NUM = 3;
6366

67+
private static final int DEFAULT_FETCH_SIZE = 1000;
68+
6469
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
6570

6671
public RdbAllReqRow(SideInfo sideInfo) {
6772
super(sideInfo);
6873
}
6974

70-
@Override
71-
public Row fillData(Row input, Object sideInput) {
72-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
73-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
74-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
75-
Object obj = input.getField(entry.getValue());
76-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
77-
78-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
79-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
80-
obj = ((Timestamp) obj).getTime();
81-
}
82-
83-
row.setField(entry.getKey(), obj);
84-
}
85-
86-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
87-
if (cacheInfo == null) {
88-
row.setField(entry.getKey(), null);
89-
} else {
90-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
91-
}
92-
}
93-
94-
return row;
95-
}
96-
9775
@Override
9876
protected void initCache() throws SQLException {
9977
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
@@ -105,70 +83,78 @@ protected void initCache() throws SQLException {
10583
protected void reloadCache() {
10684
//reload cacheRef and replace to old cacheRef
10785
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
86+
cacheRef.set(newCache);
10887
try {
10988
loadData(newCache);
11089
} catch (SQLException e) {
111-
LOG.error("", e);
90+
throw new RuntimeException(e);
11291
}
113-
114-
cacheRef.set(newCache);
11592
LOG.info("----- rdb all cacheRef reload end:{}", Calendar.getInstance());
11693
}
11794

118-
11995
@Override
12096
public void flatMap(CRow value, Collector<CRow> out) throws Exception {
121-
List<Object> inputParams = Lists.newArrayList();
122-
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
123-
Object equalObj = value.row().getField(conValIndex);
124-
if (equalObj == null) {
125-
if (sideInfo.getJoinType() == JoinType.LEFT) {
126-
Row row = fillData(value.row(), null);
127-
out.collect(new CRow(row, value.change()));
128-
}
129-
return;
130-
}
131-
inputParams.add(equalObj);
97+
List<Integer> equalValIndex = sideInfo.getEqualValIndex();
98+
ArrayList<Object> inputParams = equalValIndex.stream()
99+
.map(value.row()::getField)
100+
.filter(object -> null != object)
101+
.collect(Collectors.toCollection(ArrayList::new));
102+
103+
if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) {
104+
out.collect(new CRow(fillData(value.row(), null), value.change()));
105+
return;
132106
}
133107

134-
String key = buildKey(inputParams);
135-
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
136-
if (CollectionUtils.isEmpty(cacheList)) {
137-
if (sideInfo.getJoinType() == JoinType.LEFT) {
138-
Row row = fillData(value.row(), null);
139-
out.collect(new CRow(row, value.change()));
140-
} else {
141-
return;
142-
}
108+
String cacheKey = inputParams.stream()
109+
.map(Object::toString)
110+
.collect(Collectors.joining("_"));
143111

144-
return;
112+
List<Map<String, Object>> cacheList = cacheRef.get().get(cacheKey);
113+
if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) {
114+
out.collect(new CRow(fillData(value.row(), null), value.change()));
145115
}
146116

147-
for (Map<String, Object> one : cacheList) {
148-
out.collect(new CRow(fillData(value.row(), one), value.change()));
149-
}
117+
cacheList.stream().forEach(one -> out.collect(new CRow(fillData(value.row(), one), value.change())));
150118
}
151119

152-
private String buildKey(List<Object> equalValList) {
153-
StringBuilder sb = new StringBuilder("");
154-
for (Object equalVal : equalValList) {
155-
sb.append(equalVal).append("_");
120+
@Override
121+
public Row fillData(Row input, Object sideInput) {
122+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
123+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
124+
125+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
126+
// origin value
127+
Object obj = input.getField(entry.getValue());
128+
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
129+
row.setField(entry.getKey(), obj);
156130
}
157131

158-
return sb.toString();
159-
}
132+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
133+
if (cacheInfo == null) {
134+
row.setField(entry.getKey(), null);
135+
} else {
136+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
137+
}
160138

161-
private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
162-
StringBuilder sb = new StringBuilder("");
163-
for (String equalField : equalFieldList) {
164-
sb.append(val.get(equalField)).append("_");
165139
}
166-
167-
return sb.toString();
140+
return row;
168141
}
169142

170-
public abstract Connection getConn(String dbURL, String userName, String password);
171-
143+
/**
144+
* covert flink time attribute.Type information for indicating event or processing time.
145+
* However, it behaves like a regular SQL timestamp but is serialized as Long.
146+
*
147+
* @param entry
148+
* @param obj
149+
* @return
150+
*/
151+
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
152+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
153+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
154+
obj = ((Timestamp) obj).getTime();
155+
}
156+
return obj;
157+
}
172158

173159
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
174160
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
@@ -191,40 +177,57 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
191177
LOG.error("", e1);
192178
}
193179
}
194-
195-
}
196-
197-
//load data from table
198-
String sql = sideInfo.getSqlCondition();
199-
Statement statement = connection.createStatement();
200-
statement.setFetchSize(getFetchSize());
201-
ResultSet resultSet = statement.executeQuery(sql);
202-
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
203-
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
204-
while (resultSet.next()) {
205-
Map<String, Object> oneRow = Maps.newHashMap();
206-
for (String fieldName : sideFieldNames) {
207-
Object object = resultSet.getObject(fieldName.trim());
208-
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
209-
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
210-
oneRow.put(fieldName.trim(), object);
211-
}
212-
213-
String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList());
214-
List<Map<String, Object>> list = tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList());
215-
list.add(oneRow);
216180
}
181+
queryAndFillData(tmpCache, connection);
217182
} catch (Exception e) {
218183
LOG.error("", e);
184+
throw new SQLException(e);
219185
} finally {
220186
if (connection != null) {
221187
connection.close();
222188
}
223189
}
224190
}
225191

192+
private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, Connection connection) throws SQLException {
193+
//load data from table
194+
String sql = sideInfo.getSqlCondition();
195+
Statement statement = connection.createStatement();
196+
statement.setFetchSize(getFetchSize());
197+
ResultSet resultSet = statement.executeQuery(sql);
198+
199+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
200+
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
201+
while (resultSet.next()) {
202+
Map<String, Object> oneRow = Maps.newHashMap();
203+
for (String fieldName : sideFieldNames) {
204+
Object object = resultSet.getObject(fieldName.trim());
205+
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
206+
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
207+
oneRow.put(fieldName.trim(), object);
208+
}
209+
210+
String cacheKey = sideInfo.getEqualFieldList().stream()
211+
.map(equalField -> oneRow.get(equalField))
212+
.map(Object::toString)
213+
.collect(Collectors.joining("_"));
214+
215+
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
216+
.add(oneRow);
217+
}
218+
}
219+
226220
public int getFetchSize() {
227-
return 1000;
221+
return DEFAULT_FETCH_SIZE;
228222
}
229223

224+
/**
225+
* get jdbc connection
226+
* @param dbURL
227+
* @param userName
228+
* @param password
229+
* @return
230+
*/
231+
public abstract Connection getConn(String dbURL, String userName, String password);
232+
230233
}

0 commit comments

Comments
 (0)