Skip to content

Commit 3ebff3c

Browse files
committed
[fix] time transform error in view
1 parent d03cf56 commit 3ebff3c

File tree

4 files changed

+10
-7
lines changed

4 files changed

+10
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,9 @@ private void joinFun(Object pollObj,
409409

410410
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
411411

412-
DataStream adaptStream = tableEnv.toRetractStream(targetTable, Row.class)
412+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
413413
.filter(f -> f.f0)
414-
.map(f -> f.f1)
415-
.returns(Row.class);
414+
.map(f -> f.f1);
416415

417416
//join side table before keyby ===> Reducing the size of each dimension table cache of async
418417
if (sideTableInfo.isPartitionedJoin()) {

localTest/src/main/java/com/dtstack/flink/sql/localTest/LocalTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static void main(String[] args) throws Exception {
5050
setLogLevel("INFO");
5151

5252
List<String> propertiesList = new ArrayList<>();
53-
String sqlPath = "/Users/wtz4680/Desktop/flinkStreamSQL/sql/rowtimeprint.sql";
53+
String sqlPath = "/Users/wtz/dtstack/job/flinkStreamSQL/temp2.sql";
5454
Map<String, Object> conf = new HashMap<>();
5555
JSONObject properties = new JSONObject();
5656

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<module>polardb</module>
3737
<module>oceanbase</module>
3838
<module>tidb</module>
39-
<!-- <module>localTest</module>-->
39+
<module>localTest</module>
4040

4141
</modules>
4242

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@
4848
import java.time.Instant;
4949
import java.util.List;
5050
import java.util.Map;
51-
import java.util.concurrent.*;
51+
import java.util.concurrent.CountDownLatch;
52+
import java.util.concurrent.LinkedBlockingQueue;
53+
import java.util.concurrent.ThreadPoolExecutor;
54+
import java.util.concurrent.TimeUnit;
5255
import java.util.concurrent.atomic.AtomicBoolean;
5356
import java.util.concurrent.atomic.AtomicLong;
5457

@@ -117,7 +120,8 @@ protected void preInvoke(Row input, ResultFuture<BaseRow> resultFuture) {
117120
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
118121

119122
AtomicLong networkLogCounter = new AtomicLong(0L);
120-
while (!connectionStatus.get()) {//network is unhealth
123+
//network is unhealthy
124+
while (!connectionStatus.get()) {
121125
if (networkLogCounter.getAndIncrement() % 1000 == 0) {
122126
LOG.info("network unhealth to block task");
123127
}

0 commit comments

Comments
 (0)