Skip to content

Commit f6ff456

Browse files
committed
[rdbsink表名与注册表名称一致][flinkSQL任务,设置结果表,提交运行中,停止该任务修改结果表,再次提交运行中,数据曲线变成两条,且新的结果表数据不正确][18063]
1 parent 78f5131 commit f6ff456

File tree

1 file changed

+5
-1
lines changed
  • rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb

1 file changed

+5
-1
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
6969

7070
protected String tableName;
7171

72+
protected String registerTabName;
73+
7274
protected String sql;
7375

7476
protected List<String> primaryKeys;
@@ -112,6 +114,7 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
112114
String tmpUserName = rdbTableInfo.getUserName();
113115
String tmpPassword = rdbTableInfo.getPassword();
114116
String tmpTableName = rdbTableInfo.getTableName();
117+
String tmpRegisterName = rdbTableInfo.getName();
115118

116119
Integer tmpSqlBatchSize = rdbTableInfo.getBatchSize();
117120
if (tmpSqlBatchSize != null) {
@@ -136,6 +139,7 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
136139
this.userName = tmpUserName;
137140
this.password = tmpPassword;
138141
this.tableName = tmpTableName;
142+
this.registerTabName = tmpRegisterName;
139143
this.primaryKeys = rdbTableInfo.getPrimaryKeys();
140144
this.dbType = rdbTableInfo.getType();
141145

@@ -193,7 +197,7 @@ protected void buildSqlTypes(List<Class> fieldTypeArray) {
193197
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
194198
RichSinkFunction richSinkFunction = createJdbcSinkFunc();
195199
DataStreamSink streamSink = dataStream.addSink(richSinkFunction);
196-
streamSink.name(tableName);
200+
streamSink.name(registerTabName);
197201
if (parallelism > 0) {
198202
streamSink.setParallelism(parallelism);
199203
}

0 commit comments

Comments
 (0)