1818
1919package com .dtstack .flink .sql .side .rdb .all ;
2020
21+ import org .apache .flink .api .common .typeinfo .TypeInformation ;
2122import org .apache .flink .table .runtime .types .CRow ;
2223import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
2324import org .apache .flink .types .Row ;
4041import java .sql .SQLException ;
4142import java .sql .Statement ;
4243import java .sql .Timestamp ;
44+ import java .util .ArrayList ;
4345import java .util .Calendar ;
4446import java .util .List ;
4547import java .util .Map ;
4648import java .util .concurrent .atomic .AtomicReference ;
49+ import java .util .stream .Collectors ;
4750
4851/**
4952 * side operator with cache for all(period reload)
@@ -61,39 +64,14 @@ public abstract class RdbAllReqRow extends AllReqRow {
6164
6265 private static final int CONN_RETRY_NUM = 3 ;
6366
67+ private static final int DEFAULT_FETCH_SIZE = 1000 ;
68+
6469 private AtomicReference <Map <String , List <Map <String , Object >>>> cacheRef = new AtomicReference <>();
6570
6671 public RdbAllReqRow (SideInfo sideInfo ) {
6772 super (sideInfo );
6873 }
6974
70- @ Override
71- public Row fillData (Row input , Object sideInput ) {
72- Map <String , Object > cacheInfo = (Map <String , Object >) sideInput ;
73- Row row = new Row (sideInfo .getOutFieldInfoList ().size ());
74- for (Map .Entry <Integer , Integer > entry : sideInfo .getInFieldIndex ().entrySet ()) {
75- Object obj = input .getField (entry .getValue ());
76- boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (entry .getValue ()).getClass ());
77-
78- //Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
79- if (obj instanceof Timestamp && isTimeIndicatorTypeInfo ) {
80- obj = ((Timestamp ) obj ).getTime ();
81- }
82-
83- row .setField (entry .getKey (), obj );
84- }
85-
86- for (Map .Entry <Integer , String > entry : sideInfo .getSideFieldNameIndex ().entrySet ()) {
87- if (cacheInfo == null ) {
88- row .setField (entry .getKey (), null );
89- } else {
90- row .setField (entry .getKey (), cacheInfo .get (entry .getValue ()));
91- }
92- }
93-
94- return row ;
95- }
96-
9775 @ Override
9876 protected void initCache () throws SQLException {
9977 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
@@ -105,70 +83,78 @@ protected void initCache() throws SQLException {
10583 protected void reloadCache () {
10684 //reload cacheRef and replace to old cacheRef
10785 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
86+ cacheRef .set (newCache );
10887 try {
10988 loadData (newCache );
11089 } catch (SQLException e ) {
111- LOG . error ( "" , e );
90+ throw new RuntimeException ( e );
11291 }
113-
114- cacheRef .set (newCache );
11592 LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
11693 }
11794
118-
11995 @ Override
12096 public void flatMap (CRow value , Collector <CRow > out ) throws Exception {
121- List <Object > inputParams = Lists .newArrayList ();
122- for (Integer conValIndex : sideInfo .getEqualValIndex ()) {
123- Object equalObj = value .row ().getField (conValIndex );
124- if (equalObj == null ) {
125- if (sideInfo .getJoinType () == JoinType .LEFT ) {
126- Row row = fillData (value .row (), null );
127- out .collect (new CRow (row , value .change ()));
128- }
129- return ;
130- }
131- inputParams .add (equalObj );
97+ List <Integer > equalValIndex = sideInfo .getEqualValIndex ();
98+ ArrayList <Object > inputParams = equalValIndex .stream ()
99+ .map (value .row ()::getField )
100+ .filter (object -> null != object )
101+ .collect (Collectors .toCollection (ArrayList ::new ));
102+
103+ if (inputParams .size () != equalValIndex .size () && sideInfo .getJoinType () == JoinType .LEFT ) {
104+ out .collect (new CRow (fillData (value .row (), null ), value .change ()));
105+ return ;
132106 }
133107
134- String key = buildKey (inputParams );
135- List <Map <String , Object >> cacheList = cacheRef .get ().get (key );
136- if (CollectionUtils .isEmpty (cacheList )) {
137- if (sideInfo .getJoinType () == JoinType .LEFT ) {
138- Row row = fillData (value .row (), null );
139- out .collect (new CRow (row , value .change ()));
140- } else {
141- return ;
142- }
108+ String cacheKey = inputParams .stream ()
109+ .map (Object ::toString )
110+ .collect (Collectors .joining ("_" ));
143111
144- return ;
112+ List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
113+ if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
114+ out .collect (new CRow (fillData (value .row (), null ), value .change ()));
145115 }
146116
147- for (Map <String , Object > one : cacheList ) {
148- out .collect (new CRow (fillData (value .row (), one ), value .change ()));
149- }
117+ cacheList .stream ().forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
150118 }
151119
152- private String buildKey (List <Object > equalValList ) {
153- StringBuilder sb = new StringBuilder ("" );
154- for (Object equalVal : equalValList ) {
155- sb .append (equalVal ).append ("_" );
120+ @ Override
121+ public Row fillData (Row input , Object sideInput ) {
122+ Map <String , Object > cacheInfo = (Map <String , Object >) sideInput ;
123+ Row row = new Row (sideInfo .getOutFieldInfoList ().size ());
124+
125+ for (Map .Entry <Integer , Integer > entry : sideInfo .getInFieldIndex ().entrySet ()) {
126+ // origin value
127+ Object obj = input .getField (entry .getValue ());
128+ obj = dealTimeAttributeType (sideInfo .getRowTypeInfo ().getTypeAt (entry .getValue ()).getClass (), obj );
129+ row .setField (entry .getKey (), obj );
156130 }
157131
158- return sb .toString ();
159- }
132+ for (Map .Entry <Integer , String > entry : sideInfo .getSideFieldNameIndex ().entrySet ()) {
133+ if (cacheInfo == null ) {
134+ row .setField (entry .getKey (), null );
135+ } else {
136+ row .setField (entry .getKey (), cacheInfo .get (entry .getValue ()));
137+ }
160138
161- private String buildKey (Map <String , Object > val , List <String > equalFieldList ) {
162- StringBuilder sb = new StringBuilder ("" );
163- for (String equalField : equalFieldList ) {
164- sb .append (val .get (equalField )).append ("_" );
165139 }
166-
167- return sb .toString ();
140+ return row ;
168141 }
169142
170- public abstract Connection getConn (String dbURL , String userName , String password );
171-
143+ /**
144+ * covert flink time attribute.Type information for indicating event or processing time.
145+ * However, it behaves like a regular SQL timestamp but is serialized as Long.
146+ *
147+ * @param entry
148+ * @param obj
149+ * @return
150+ */
151+ protected Object dealTimeAttributeType (Class <? extends TypeInformation > entry , Object obj ) {
152+ boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (entry );
153+ if (obj instanceof Timestamp && isTimeIndicatorTypeInfo ) {
154+ obj = ((Timestamp ) obj ).getTime ();
155+ }
156+ return obj ;
157+ }
172158
173159 private void loadData (Map <String , List <Map <String , Object >>> tmpCache ) throws SQLException {
174160 RdbSideTableInfo tableInfo = (RdbSideTableInfo ) sideInfo .getSideTableInfo ();
@@ -191,40 +177,57 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
191177 LOG .error ("" , e1 );
192178 }
193179 }
194-
195- }
196-
197- //load data from table
198- String sql = sideInfo .getSqlCondition ();
199- Statement statement = connection .createStatement ();
200- statement .setFetchSize (getFetchSize ());
201- ResultSet resultSet = statement .executeQuery (sql );
202- String [] sideFieldNames = StringUtils .split (sideInfo .getSideSelectFields (), "," );
203- String [] fields = sideInfo .getSideTableInfo ().getFieldTypes ();
204- while (resultSet .next ()) {
205- Map <String , Object > oneRow = Maps .newHashMap ();
206- for (String fieldName : sideFieldNames ) {
207- Object object = resultSet .getObject (fieldName .trim ());
208- int fieldIndex = sideInfo .getSideTableInfo ().getFieldList ().indexOf (fieldName .trim ());
209- object = SwitchUtil .getTarget (object , fields [fieldIndex ]);
210- oneRow .put (fieldName .trim (), object );
211- }
212-
213- String cacheKey = buildKey (oneRow , sideInfo .getEqualFieldList ());
214- List <Map <String , Object >> list = tmpCache .computeIfAbsent (cacheKey , key -> Lists .newArrayList ());
215- list .add (oneRow );
216180 }
181+ queryAndFillData (tmpCache , connection );
217182 } catch (Exception e ) {
218183 LOG .error ("" , e );
184+ throw new SQLException (e );
219185 } finally {
220186 if (connection != null ) {
221187 connection .close ();
222188 }
223189 }
224190 }
225191
192+ private void queryAndFillData (Map <String , List <Map <String , Object >>> tmpCache , Connection connection ) throws SQLException {
193+ //load data from table
194+ String sql = sideInfo .getSqlCondition ();
195+ Statement statement = connection .createStatement ();
196+ statement .setFetchSize (getFetchSize ());
197+ ResultSet resultSet = statement .executeQuery (sql );
198+
199+ String [] sideFieldNames = StringUtils .split (sideInfo .getSideSelectFields (), "," );
200+ String [] fields = sideInfo .getSideTableInfo ().getFieldTypes ();
201+ while (resultSet .next ()) {
202+ Map <String , Object > oneRow = Maps .newHashMap ();
203+ for (String fieldName : sideFieldNames ) {
204+ Object object = resultSet .getObject (fieldName .trim ());
205+ int fieldIndex = sideInfo .getSideTableInfo ().getFieldList ().indexOf (fieldName .trim ());
206+ object = SwitchUtil .getTarget (object , fields [fieldIndex ]);
207+ oneRow .put (fieldName .trim (), object );
208+ }
209+
210+ String cacheKey = sideInfo .getEqualFieldList ().stream ()
211+ .map (equalField -> oneRow .get (equalField ))
212+ .map (Object ::toString )
213+ .collect (Collectors .joining ("_" ));
214+
215+ tmpCache .computeIfAbsent (cacheKey , key -> Lists .newArrayList ())
216+ .add (oneRow );
217+ }
218+ }
219+
226220 public int getFetchSize () {
227- return 1000 ;
221+ return DEFAULT_FETCH_SIZE ;
228222 }
229223
224+ /**
225+ * get jdbc connection
226+ * @param dbURL
227+ * @param userName
228+ * @param password
229+ * @return
230+ */
231+ public abstract Connection getConn (String dbURL , String userName , String password );
232+
230233}
0 commit comments