2020package com .dtstack .flink .sql .side .rdb .async ;
2121
2222import com .dtstack .flink .sql .enums .ECacheContentType ;
23+ import com .dtstack .flink .sql .metric .MetricConstant ;
2324import com .dtstack .flink .sql .side .BaseAsyncReqRow ;
2425import com .dtstack .flink .sql .side .BaseSideInfo ;
2526import com .dtstack .flink .sql .side .CacheMissVal ;
3637import org .apache .calcite .sql .JoinType ;
3738import org .apache .commons .lang3 .StringUtils ;
3839import org .apache .flink .configuration .Configuration ;
40+ import org .apache .flink .metrics .Counter ;
3941import org .apache .flink .streaming .api .functions .async .ResultFuture ;
4042import org .apache .flink .table .runtime .types .CRow ;
4143import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
@@ -86,10 +88,12 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
8688
8789 private final static AtomicBoolean CONN_STATUS = new AtomicBoolean (true );
8890
89- private final static AtomicLong TIMOUT_NUM = new AtomicLong (0 );
91+ private final static AtomicLong FAIL_NUM = new AtomicLong (0 );
9092
9193 private Logger logger = LoggerFactory .getLogger (getClass ());
9294
95+ private Counter counter = getRuntimeContext ().getMetricGroup ().counter (MetricConstant .DT_NUM_SIDE_PARSE_ERROR_RECORDS );
96+
9397 public RdbAsyncReqRow (BaseSideInfo sideInfo ) {
9498 super (sideInfo );
9599 init (sideInfo );
@@ -102,14 +106,6 @@ protected void init(BaseSideInfo sideInfo) {
102106 rdbSideTableInfo .setAsyncPoolSize (rdbPoolSize );
103107 }
104108
105- @ Override
106- public void open (Configuration parameters ) throws Exception {
107- super .open (parameters );
108- RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo ) sideInfo .getSideTableInfo ();
109- LOG .info ("rdb dim table config info: {} " , rdbSideTableInfo .toString ());
110- }
111-
112-
113109 @ Override
114110 protected void preInvoke (CRow input , ResultFuture <CRow > resultFuture ){
115111
@@ -148,7 +144,12 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
148144 logger .error ("getConnection error" , conn .cause ());
149145 }
150146 if (failCounter .get () >= sideInfo .getSideTableInfo ().getAsyncFailMaxNum (3L )){
151- outByJoinType (resultFuture , conn .cause ());
147+ if (FAIL_NUM .incrementAndGet () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )){
148+ counter .inc ();
149+ resultFuture .completeExceptionally (conn .cause ());
150+ } else {
151+ dealMissKey (input , resultFuture );
152+ }
152153 finishFlag .set (true );
153154 }
154155 conn .result ().close ();
@@ -160,6 +161,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
160161 handleQuery (conn .result (), inputParams , input , resultFuture );
161162 finishFlag .set (true );
162163 } catch (Exception e ) {
164+ dealFillDataError (resultFuture , e , null );
163165 logger .error ("" , e );
164166 } finally {
165167 latch .countDown ();
@@ -226,12 +228,13 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
226228 JsonArray params = new JsonArray (Lists .newArrayList (inputParams .values ()));
227229 connection .queryWithParams (sideInfo .getSqlCondition (), params , rs -> {
228230 if (rs .failed ()) {
229- if (TIMOUT_NUM .incrementAndGet () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )){
230- outByJoinType (resultFuture , rs .cause ());
231- return ;
231+ if (FAIL_NUM .incrementAndGet () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )){
232+ LOG .error ("Cannot retrieve the data from the database" , rs .cause ());
233+ counter .inc ();
234+ resultFuture .completeExceptionally (rs .cause ());
235+ } else {
236+ dealMissKey (input , resultFuture );
232237 }
233- LOG .error ("Cannot retrieve the data from the database" , rs .cause ());
234- resultFuture .complete (null );
235238 return ;
236239 }
237240
@@ -270,14 +273,6 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
270273 });
271274 }
272275
273- private void outByJoinType (ResultFuture <CRow > resultFuture , Throwable e ){
274- if (sideInfo .getJoinType () == JoinType .LEFT ){
275- resultFuture .complete (null );
276- return ;
277- }
278- resultFuture .completeExceptionally (e );
279- }
280-
281276 private Map <String , Object > formatInputParam (Map <String , Object > inputParam ){
282277 Map <String , Object > result = Maps .newHashMap ();
283278 inputParam .forEach ((k ,v ) -> {
0 commit comments