@@ -91,12 +91,12 @@ protected void initCache() throws SQLException {
9191 protected void reloadCache () {
9292 //reload cacheRef and replace to old cacheRef
9393 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
94- cacheRef .set (newCache );
9594 try {
9695 loadData (newCache );
9796 } catch (SQLException e ) {
9897 throw new RuntimeException (e );
9998 }
99+ cacheRef .set (newCache );
100100 LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
101101 }
102102
@@ -120,9 +120,9 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
120120 List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
121121 if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
122122 out .collect (new CRow (fillData (value .row (), null ), value .change ()));
123+ } else if (!CollectionUtils .isEmpty (cacheList )) {
124+ cacheList .forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
123125 }
124-
125- cacheList .stream ().forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
126126 }
127127
128128 @ Override
@@ -149,8 +149,8 @@ public Row fillData(Row input, Object sideInput) {
149149 }
150150
151151 /**
152- * covert flink time attribute.Type information for indicating event or processing time.
153- * However, it behaves like a regular SQL timestamp but is serialized as Long.
152+ * covert flink time attribute.Type information for indicating event or processing time.
153+ * However, it behaves like a regular SQL timestamp but is serialized as Long.
154154 *
155155 * @param entry
156156 * @param obj
@@ -230,7 +230,8 @@ public int getFetchSize() {
230230 }
231231
232232 /**
233- * get jdbc connection
233+ * get jdbc connection
234+ *
234235 * @param dbURL
235236 * @param userName
236237 * @param password
0 commit comments