Skip to content

Commit da62a3d

Browse files
committed
[fix-33351][core] 修复同一个任务写入多张kudu结果表,当任务结束时,kudu结果表数据不一致问题。
1 parent bd7d652 commit da62a3d

File tree

2 files changed

+20
-16
lines changed

2 files changed

+20
-16
lines changed

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@
4040
import java.security.PrivilegedAction;
4141
import java.sql.Timestamp;
4242
import java.util.Date;
43+
import java.util.Objects;
4344

4445
/**
45-
* @author gituser
46-
* @modify xiuzhu
46+
* @author gituser
47+
* @modify xiuzhu
4748
*/
4849
public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
4950

@@ -74,6 +75,8 @@ public enum WriteMode {
7475

7576
private KuduTable table;
7677

78+
private AsyncKuduSession session;
79+
7780
private Integer workerCount;
7881

7982
private Integer defaultOperationTimeoutMs;
@@ -117,17 +120,12 @@ private void establishConnection() throws IOException {
117120

118121
if (enableKrb) {
119122
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
120-
principal,
121-
keytab,
122-
krb5conf
123+
principal,
124+
keytab,
125+
krb5conf
123126
);
124127
client = ugi.doAs(
125-
new PrivilegedAction<AsyncKuduClient>() {
126-
@Override
127-
public AsyncKuduClient run() {
128-
return asyncKuduClientBuilder.build();
129-
}
130-
});
128+
(PrivilegedAction<AsyncKuduClient>) asyncKuduClientBuilder::build);
131129
} else {
132130
client = asyncKuduClientBuilder.build();
133131
}
@@ -136,6 +134,7 @@ public AsyncKuduClient run() {
136134
if (syncClient.tableExists(tableName)) {
137135
table = syncClient.openTable(tableName);
138136
}
137+
session = client.newSession();
139138
}
140139

141140
@Override
@@ -147,26 +146,24 @@ public void writeRecord(Tuple2 record) throws IOException {
147146
}
148147
Row row = tupleTrans.getField(1);
149148
if (row.getArity() != fieldNames.length) {
150-
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
149+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
151150
LOG.error("record insert failed ..{}", row.toString());
152151
LOG.error("cause by row.getArity() != fieldNames.length");
153152
}
154153
outDirtyRecords.inc();
155154
return;
156155
}
157156
Operation operation = toOperation(writeMode, row);
158-
AsyncKuduSession session = client.newSession();
159157

160158
try {
161159
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
162160
LOG.info("Receive data : {}", row);
163161
}
164162

165163
session.apply(operation);
166-
session.close();
167164
outRecords.inc();
168165
} catch (KuduException e) {
169-
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
166+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
170167
LOG.error("record insert failed, total dirty record:{} current row:{}", outDirtyRecords.getCount(), row.toString());
171168
LOG.error("", e);
172169
}
@@ -176,9 +173,15 @@ public void writeRecord(Tuple2 record) throws IOException {
176173

177174
@Override
178175
public void close() {
176+
if (Objects.nonNull(session)) {
177+
// 先把未执行完的操作执行掉,防止操作不一致
178+
session.flush();
179+
session.close();
180+
}
181+
179182
if (null != client) {
180183
try {
181-
client.close();
184+
client.shutdown();
182185
} catch (Exception e) {
183186
throw new IllegalArgumentException("[closeKudu]:" + e.getMessage());
184187
}

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduSinkParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public Class dbTypeConvertToJavaType(String fieldType) {
9494
case "date":
9595
return Date.class;
9696
case "unixtime_micros":
97+
case "timestamp":
9798
return Timestamp.class;
9899
case "decimal":
99100
return BigDecimal.class;

0 commit comments

Comments
 (0)