Skip to content

Commit eb23e15

Browse files
committed
[fix] 修复postgre 做维表时schema不生效
1 parent b64dd57 commit eb23e15

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

postgresql/postgresql-side/postgresql-all-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAllSideInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -38,4 +40,9 @@ public class PostgresqlAllSideInfo extends RdbAllSideInfo {
3840
public PostgresqlAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3941
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4042
}
43+
44+
@Override
45+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
46+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
47+
}
4148
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
package com.dtstack.flink.sql.side.postgresql;
2121

22-
import com.dtstack.flink.sql.factory.DTThreadFactory;
22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2726
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2827
import io.vertx.core.Vertx;
@@ -35,9 +34,6 @@
3534
import org.slf4j.LoggerFactory;
3635

3736
import java.util.List;
38-
import java.util.concurrent.LinkedBlockingQueue;
39-
import java.util.concurrent.ThreadPoolExecutor;
40-
import java.util.concurrent.TimeUnit;
4137

4238
/**
4339
* Date: 2019-08-11

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncSideInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -39,4 +41,9 @@ public class PostgresqlAsyncSideInfo extends RdbAsyncSideInfo {
3941
public PostgresqlAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4042
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4143
}
44+
45+
@Override
46+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
47+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
48+
}
4249
}

0 commit comments

Comments
 (0)