@@ -94,12 +94,12 @@ protected void initCache() throws SQLException {
9494 protected void reloadCache () {
9595 //reload cacheRef and replace to old cacheRef
9696 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
97- cacheRef .set (newCache );
9897 try {
9998 loadData (newCache );
10099 } catch (SQLException e ) {
101100 throw new RuntimeException (e );
102101 }
102+ cacheRef .set (newCache );
103103 LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
104104 }
105105
@@ -123,9 +123,9 @@ public void flatMap(CRow value, Collector<CRow> out) throws Exception {
123123 List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
124124 if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
125125 out .collect (new CRow (fillData (value .row (), null ), value .change ()));
126+ } else if (!CollectionUtils .isEmpty (cacheList )) {
127+ cacheList .forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
126128 }
127-
128- cacheList .forEach (one -> out .collect (new CRow (fillData (value .row (), one ), value .change ())));
129129 }
130130
131131 @ Override
@@ -152,8 +152,8 @@ public Row fillData(Row input, Object sideInput) {
152152 }
153153
154154 /**
155- * covert flink time attribute.Type information for indicating event or processing time.
156- * However, it behaves like a regular SQL timestamp but is serialized as Long.
155+ * covert flink time attribute.Type information for indicating event or processing time.
156+ * However, it behaves like a regular SQL timestamp but is serialized as Long.
157157 *
158158 * @param entry
159159 * @param obj
@@ -233,7 +233,8 @@ public int getFetchSize() {
233233 }
234234
235235 /**
236- * get jdbc connection
236+ * get jdbc connection
237+ *
237238 * @param dbURL
238239 * @param userName
239240 * @param password
0 commit comments