1919
2020package com .dtstack .flink .sql .side .cassandra ;
2121
22- import org .apache .flink .api .java .typeutils .RowTypeInfo ;
23- import org .apache .flink .configuration .Configuration ;
24- import org .apache .flink .streaming .api .functions .async .ResultFuture ;
25- import org .apache .flink .table .runtime .types .CRow ;
26- import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
27- import org .apache .flink .types .Row ;
28-
29- import com .datastax .driver .core .Cluster ;
30- import com .datastax .driver .core .ConsistencyLevel ;
31- import com .datastax .driver .core .HostDistance ;
32- import com .datastax .driver .core .PoolingOptions ;
33- import com .datastax .driver .core .QueryOptions ;
34- import com .datastax .driver .core .ResultSet ;
35- import com .datastax .driver .core .Session ;
36- import com .datastax .driver .core .SocketOptions ;
22+ import com .datastax .driver .core .*;
3723import com .datastax .driver .core .policies .DowngradingConsistencyRetryPolicy ;
3824import com .datastax .driver .core .policies .RetryPolicy ;
3925import com .dtstack .flink .sql .enums .ECacheContentType ;
40- import com .dtstack .flink .sql .side .BaseAsyncReqRow ;
41- import com .dtstack .flink .sql .side .CacheMissVal ;
42- import com .dtstack .flink .sql .side .FieldInfo ;
43- import com .dtstack .flink .sql .side .JoinInfo ;
44- import com .dtstack .flink .sql .side .AbstractSideTableInfo ;
26+ import com .dtstack .flink .sql .side .*;
4527import com .dtstack .flink .sql .side .cache .CacheObj ;
4628import com .dtstack .flink .sql .side .cassandra .table .CassandraSideTableInfo ;
29+ import com .dtstack .flink .sql .util .RowDataComplete ;
4730import com .google .common .base .Function ;
4831import com .google .common .collect .Lists ;
4932import com .google .common .util .concurrent .AsyncFunction ;
5033import com .google .common .util .concurrent .FutureCallback ;
5134import com .google .common .util .concurrent .Futures ;
5235import com .google .common .util .concurrent .ListenableFuture ;
53- import io .vertx .core .json .JsonArray ;
5436import org .apache .commons .lang3 .StringUtils ;
37+ import org .apache .flink .api .java .typeutils .RowTypeInfo ;
38+ import org .apache .flink .configuration .Configuration ;
39+ import org .apache .flink .streaming .api .functions .async .ResultFuture ;
40+ import org .apache .flink .table .dataformat .BaseRow ;
41+ import org .apache .flink .types .Row ;
5542import org .slf4j .Logger ;
5643import org .slf4j .LoggerFactory ;
5744
5845import java .net .InetAddress ;
59- import java .sql .Timestamp ;
6046import java .util .ArrayList ;
47+ import java .util .Collections ;
6148import java .util .List ;
6249import java .util .Map ;
63- import java .util .TimeZone ;
6450
6551/**
6652 * Reason:
@@ -74,8 +60,6 @@ public class CassandraAsyncReqRow extends BaseAsyncReqRow {
7460
7561 private static final Logger LOG = LoggerFactory .getLogger (CassandraAsyncReqRow .class );
7662
77- private static final TimeZone LOCAL_TZ = TimeZone .getDefault ();
78-
7963 private final static int DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE = 10 ;
8064
8165 private final static int DEFAULT_VERTX_WORKER_POOL_SIZE = 20 ;
@@ -165,7 +149,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
165149 }
166150
167151 @ Override
168- public void handleAsyncInvoke (Map <String , Object > inputParams , CRow input , ResultFuture <CRow > resultFuture ) throws Exception {
152+ public void handleAsyncInvoke (Map <String , Object > inputParams , Row input , ResultFuture <BaseRow > resultFuture ) throws Exception {
169153
170154 String key = buildCacheKey (inputParams );
171155 //connect Cassandra
@@ -196,15 +180,15 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
196180 cluster .closeAsync ();
197181 if (rows .size () > 0 ) {
198182 List <com .datastax .driver .core .Row > cacheContent = Lists .newArrayList ();
199- List <CRow > rowList = Lists .newArrayList ();
183+ List <Row > rowList = Lists .newArrayList ();
200184 for (com .datastax .driver .core .Row line : rows ) {
201- Row row = fillData (input . row () , line );
185+ Row row = fillData (input , line );
202186 if (openCache ()) {
203187 cacheContent .add (line );
204188 }
205- rowList .add (new CRow ( row , input . change ()) );
189+ rowList .add (row );
206190 }
207- resultFuture . complete ( rowList );
191+ RowDataComplete . completeRow ( resultFuture , rowList );
208192 if (openCache ()) {
209193 putCache (key , CacheObj .buildCacheObj (ECacheContentType .MultiLine , cacheContent ));
210194 }
@@ -213,7 +197,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
213197 if (openCache ()) {
214198 putCache (key , CacheMissVal .getMissKeyObj ());
215199 }
216- resultFuture .complete (null );
200+ resultFuture .complete (Collections . EMPTY_LIST );
217201 }
218202 }
219203
@@ -251,13 +235,7 @@ public Row fillData(Row input, Object line) {
251235 Row row = new Row (sideInfo .getOutFieldInfoList ().size ());
252236 for (Map .Entry <Integer , Integer > entry : sideInfo .getInFieldIndex ().entrySet ()) {
253237 Object obj = input .getField (entry .getValue ());
254- boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (entry .getValue ()).getClass ());
255-
256- if (obj instanceof Timestamp && isTimeIndicatorTypeInfo ) {
257- //去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
258- obj = ((Timestamp ) obj ).getTime () + (long )LOCAL_TZ .getOffset (((Timestamp ) obj ).getTime ());
259- }
260-
238+ obj = convertTimeIndictorTypeInfo (entry .getValue (), obj );
261239 row .setField (entry .getKey (), obj );
262240 }
263241
0 commit comments