Skip to content

Commit 36549fb

Browse files
committed
rename
1 parent 40460cb commit 36549fb

File tree

2 files changed

+567
-0
lines changed

2 files changed

+567
-0
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.rdb.all;
20+
21+
import com.dtstack.flink.sql.side.AllReqRow;
22+
import com.dtstack.flink.sql.side.SideInfo;
23+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
24+
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
25+
import org.apache.calcite.sql.JoinType;
26+
import org.apache.commons.collections.CollectionUtils;
27+
import com.google.common.collect.Lists;
28+
import com.google.common.collect.Maps;
29+
import org.apache.flink.table.runtime.types.CRow;
30+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
31+
import org.apache.flink.types.Row;
32+
import org.apache.flink.util.Collector;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.sql.Connection;
37+
import java.sql.ResultSet;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.sql.Timestamp;
41+
import java.util.Calendar;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.concurrent.atomic.AtomicReference;
45+
46+
/**
47+
* side operator with cache for all(period reload)
48+
* Date: 2018/11/26
49+
* Company: www.dtstack.com
50+
*
51+
* @author maqi
52+
*/
53+
54+
public abstract class AbstractRdbAllReqRow extends AllReqRow {
55+
56+
private static final long serialVersionUID = 2098635140857937718L;
57+
58+
private static final Logger LOG = LoggerFactory.getLogger(AbstractRdbAllReqRow.class);
59+
60+
private static final int CONN_RETRY_NUM = 3;
61+
62+
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
63+
64+
public AbstractRdbAllReqRow(SideInfo sideInfo) {
65+
super(sideInfo);
66+
}
67+
68+
@Override
69+
public Row fillData(Row input, Object sideInput) {
70+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
71+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
72+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
73+
Object obj = input.getField(entry.getValue());
74+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
75+
76+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
77+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
78+
obj = ((Timestamp) obj).getTime();
79+
}
80+
81+
row.setField(entry.getKey(), obj);
82+
}
83+
84+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
85+
if (cacheInfo == null) {
86+
row.setField(entry.getKey(), null);
87+
} else {
88+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
89+
}
90+
}
91+
92+
return row;
93+
}
94+
95+
@Override
96+
protected void initCache() throws SQLException {
97+
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
98+
cacheRef.set(newCache);
99+
loadData(newCache);
100+
}
101+
102+
@Override
103+
protected void reloadCache() {
104+
//reload cacheRef and replace to old cacheRef
105+
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
106+
try {
107+
loadData(newCache);
108+
} catch (SQLException e) {
109+
LOG.error("", e);
110+
}
111+
112+
cacheRef.set(newCache);
113+
LOG.info("----- rdb all cacheRef reload end:{}", Calendar.getInstance());
114+
}
115+
116+
117+
@Override
118+
public void flatMap(CRow value, Collector<CRow> out) throws Exception {
119+
List<Object> inputParams = Lists.newArrayList();
120+
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
121+
Object equalObj = value.row().getField(conValIndex);
122+
if (equalObj == null) {
123+
if (sideInfo.getJoinType() == JoinType.LEFT) {
124+
Row row = fillData(value.row(), null);
125+
out.collect(new CRow(row, value.change()));
126+
}
127+
return;
128+
}
129+
inputParams.add(equalObj);
130+
}
131+
132+
String key = buildKey(inputParams);
133+
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
134+
if (CollectionUtils.isEmpty(cacheList)) {
135+
if (sideInfo.getJoinType() == JoinType.LEFT) {
136+
Row row = fillData(value.row(), null);
137+
out.collect(new CRow(row, value.change()));
138+
} else {
139+
return;
140+
}
141+
142+
return;
143+
}
144+
145+
for (Map<String, Object> one : cacheList) {
146+
out.collect(new CRow(fillData(value.row(), one), value.change()));
147+
}
148+
}
149+
150+
private String buildKey(List<Object> equalValList) {
151+
StringBuilder sb = new StringBuilder("");
152+
for (Object equalVal : equalValList) {
153+
sb.append(equalVal).append("_");
154+
}
155+
156+
return sb.toString();
157+
}
158+
159+
private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
160+
StringBuilder sb = new StringBuilder("");
161+
for (String equalField : equalFieldList) {
162+
sb.append(val.get(equalField)).append("_");
163+
}
164+
165+
return sb.toString();
166+
}
167+
168+
public abstract Connection getConn(String dbUrl, String userName, String password);
169+
170+
171+
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
172+
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
173+
Connection connection = null;
174+
175+
try {
176+
for (int i = 0; i < CONN_RETRY_NUM; i++) {
177+
try {
178+
connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
179+
break;
180+
} catch (Exception e) {
181+
if (i == CONN_RETRY_NUM - 1) {
182+
throw new RuntimeException("", e);
183+
}
184+
try {
185+
String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword();
186+
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
187+
Thread.sleep(5 * 1000);
188+
} catch (InterruptedException e1) {
189+
LOG.error("", e1);
190+
}
191+
}
192+
193+
}
194+
195+
//load data from table
196+
String sql = sideInfo.getSqlCondition();
197+
Statement statement = connection.createStatement();
198+
statement.setFetchSize(getFetchSize());
199+
ResultSet resultSet = statement.executeQuery(sql);
200+
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
201+
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
202+
while (resultSet.next()) {
203+
Map<String, Object> oneRow = Maps.newHashMap();
204+
for (String fieldName : sideFieldNames) {
205+
Object object = resultSet.getObject(fieldName.trim());
206+
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
207+
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
208+
oneRow.put(fieldName.trim(), object);
209+
}
210+
211+
String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList());
212+
List<Map<String, Object>> list = tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList());
213+
list.add(oneRow);
214+
}
215+
} catch (Exception e) {
216+
LOG.error("", e);
217+
} finally {
218+
if (connection != null) {
219+
connection.close();
220+
}
221+
}
222+
}
223+
224+
public int getFetchSize() {
225+
return 1000;
226+
}
227+
228+
}

0 commit comments

Comments
 (0)