3838import com .google .common .collect .Maps ;
3939import org .apache .flink .configuration .Configuration ;
4040import org .apache .flink .streaming .api .functions .async .ResultFuture ;
41+ import org .apache .flink .table .runtime .types .CRow ;
4142import org .apache .flink .types .Row ;
4243
4344import java .util .Collections ;
@@ -120,14 +121,14 @@ public Row fillData(Row input, Object sideInput) {
120121 }
121122
122123 @ Override
123- public void asyncInvoke (Row input , ResultFuture <Row > resultFuture ) throws Exception {
124- Row inputRow = Row . copy ( input );
124+ public void asyncInvoke (CRow input , ResultFuture <CRow > resultFuture ) throws Exception {
125+ CRow inputCopy = new CRow ( input . row (), input . change () );
125126 Map <String , Object > refData = Maps .newHashMap ();
126127 for (int i = 0 ; i < sideInfo .getEqualValIndex ().size (); i ++) {
127128 Integer conValIndex = sideInfo .getEqualValIndex ().get (i );
128- Object equalObj = inputRow .getField (conValIndex );
129+ Object equalObj = input . row () .getField (conValIndex );
129130 if (equalObj == null ){
130- dealMissKey (inputRow , resultFuture );
131+ dealMissKey (inputCopy , resultFuture );
131132 return ;
132133 }
133134 refData .put (sideInfo .getEqualFieldList ().get (i ), equalObj );
@@ -141,14 +142,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
141142 CacheObj val = getFromCache (key );
142143 if (val != null ){
143144 if (ECacheContentType .MissVal == val .getType ()){
144- dealMissKey (inputRow , resultFuture );
145+ dealMissKey (inputCopy , resultFuture );
145146 return ;
146147 }else if (ECacheContentType .MultiLine == val .getType ()){
147148 try {
148- Row row = fillData (inputRow , val .getContent ());
149- resultFuture .complete (Collections .singleton (row ));
149+ Row row = fillData (input . row () , val .getContent ());
150+ resultFuture .complete (Collections .singleton (new CRow ( row , inputCopy . change ()) ));
150151 } catch (Exception e ) {
151- dealFillDataError (resultFuture , e , inputRow );
152+ dealFillDataError (resultFuture , e , inputCopy );
152153 }
153154 }else {
154155 RuntimeException exception = new RuntimeException ("not support cache obj type " + val .getType ());
@@ -164,14 +165,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
164165 public void accept (Map <String , String > values ) {
165166 if (MapUtils .isNotEmpty (values )) {
166167 try {
167- Row row = fillData (inputRow , values );
168+ Row row = fillData (input . row () , values );
168169 dealCacheData (key ,CacheObj .buildCacheObj (ECacheContentType .MultiLine , values ));
169- resultFuture .complete (Collections .singleton (row ));
170+ resultFuture .complete (Collections .singleton (new CRow ( row , inputCopy . change ()) ));
170171 } catch (Exception e ) {
171- dealFillDataError (resultFuture , e , inputRow );
172+ dealFillDataError (resultFuture , e , inputCopy );
172173 }
173174 } else {
174- dealMissKey (inputRow , resultFuture );
175+ dealMissKey (inputCopy , resultFuture );
175176 dealCacheData (key ,CacheMissVal .getMissKeyObj ());
176177 }
177178 }
0 commit comments