@@ -93,12 +93,12 @@ protected void initCache() throws SQLException {
9393 protected void reloadCache () {
9494 //reload cacheRef and replace to old cacheRef
9595 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
96- cacheRef .set (newCache );
9796 try {
9897 loadData (newCache );
9998 } catch (SQLException e ) {
10099 throw new RuntimeException (e );
101100 }
101+ cacheRef .set (newCache );
102102 LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
103103 }
104104
@@ -122,9 +122,9 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
122122 List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
123123 if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
124124 out .collect (new CRow (fillData (value .row (), null ), value .change ()));
125+ } else if (!CollectionUtils .isEmpty (cacheList )) {
126+ cacheList .forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
125127 }
126-
127- cacheList .stream ().forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
128128 }
129129
130130 @ Override
@@ -151,8 +151,8 @@ public Row fillData(Row input, Object sideInput) {
151151 }
152152
153153 /**
154- * covert flink time attribute.Type information for indicating event or processing time.
155- * However, it behaves like a regular SQL timestamp but is serialized as Long.
154+ * covert flink time attribute.Type information for indicating event or processing time.
155+ * However, it behaves like a regular SQL timestamp but is serialized as Long.
156156 *
157157 * @param entry
158158 * @param obj
@@ -232,7 +232,8 @@ public int getFetchSize() {
232232 }
233233
234234 /**
235- * get jdbc connection
235+ * get jdbc connection
236+ *
236237 * @param dbURL
237238 * @param userName
238239 * @param password
0 commit comments