@@ -94,12 +94,12 @@ protected void initCache() throws SQLException {
9494 protected void reloadCache () {
9595 //reload cacheRef and replace to old cacheRef
9696 Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
97- cacheRef .set (newCache );
9897 try {
9998 loadData (newCache );
10099 } catch (SQLException e ) {
101100 throw new RuntimeException (e );
102101 }
102+ cacheRef .set (newCache );
103103 LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
104104 }
105105
@@ -123,9 +123,10 @@ public void flatMap(Tuple2<Boolean,Row> value, Collector<Tuple2<Boolean,Row>> ou
123123 List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
124124 if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
125125 out .collect (Tuple2 .of (value .f0 , fillData (value .f1 , null )));
126+ } else if (!CollectionUtils .isEmpty (cacheList )) {
127+ cacheList .stream ().forEach (one -> out .collect (Tuple2 .of (value .f0 , fillData (value .f1 , one ))));
126128 }
127129
128- cacheList .stream ().forEach (one -> out .collect (Tuple2 .of (value .f0 , fillData (value .f1 , null ))));
129130 }
130131
131132 @ Override
@@ -219,7 +220,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
219220 }
220221
221222 String cacheKey = sideInfo .getEqualFieldList ().stream ()
222- .map (equalField -> oneRow . get ( equalField ) )
223+ .map (oneRow :: get )
223224 .map (Object ::toString )
224225 .collect (Collectors .joining ("_" ));
225226
@@ -233,7 +234,8 @@ public int getFetchSize() {
233234 }
234235
235236 /**
236- * get jdbc connection
237+ * get jdbc connection
238+ *
237239 * @param dbURL
238240 * @param userName
239241 * @param password
0 commit comments