@@ -83,12 +83,12 @@ protected void initCache() throws SQLException {
8383 protected void reloadCache () {
8484 //reload cacheRef and replace to old cacheRef
8585 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
86- cacheRef .set (newCache );
8786 try {
8887 loadData (newCache );
8988 } catch (SQLException e ) {
9089 throw new RuntimeException (e );
9190 }
91+ cacheRef .set (newCache );
9292 LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
9393 }
9494
@@ -112,9 +112,9 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
112112 List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
113113 if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
114114 out .collect (new CRow (fillData (value .row (), null ), value .change ()));
115+ } else if (!CollectionUtils .isEmpty (cacheList )) {
116+ cacheList .forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
115117 }
116-
117- cacheList .stream ().forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
118118 }
119119
120120 @ Override
@@ -141,8 +141,8 @@ public Row fillData(Row input, Object sideInput) {
141141 }
142142
143143 /**
144- * covert flink time attribute.Type information for indicating event or processing time.
145- * However, it behaves like a regular SQL timestamp but is serialized as Long.
144+ * covert flink time attribute.Type information for indicating event or processing time.
145+ * However, it behaves like a regular SQL timestamp but is serialized as Long.
146146 *
147147 * @param entry
148148 * @param obj
@@ -222,7 +222,8 @@ public int getFetchSize() {
222222 }
223223
224224 /**
225- * get jdbc connection
225+ * get jdbc connection
226+ *
226227 * @param dbURL
227228 * @param userName
228229 * @param password
0 commit comments