Skip to content

Commit 54dec63

Browse files
committed
temp save
1 parent 1af2223 commit 54dec63

File tree

4 files changed

+9
-8
lines changed

4 files changed

+9
-8
lines changed

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.concurrent.ScheduledFuture;
4949
import java.util.concurrent.ScheduledThreadPoolExecutor;
5050
import java.util.concurrent.TimeUnit;
51-
import java.util.concurrent.atomic.AtomicInteger;
5251
import java.util.regex.Matcher;
5352
import java.util.regex.Pattern;
5453
import java.util.stream.Collectors;
@@ -84,7 +83,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8483
protected transient PreparedStatement statement;
8584

8685
private transient volatile boolean closed = false;
87-
private final AtomicInteger batchCount = new AtomicInteger(0);
86+
private int batchCount = 0;
8887

8988
protected String keytabPath;
9089
protected String krb5confPath;
@@ -216,7 +215,8 @@ private void openJdbc() {
216215
private void flush() throws SQLException {
217216
if (Objects.nonNull(statement)) {
218217
statement.executeBatch();
219-
batchCount.set(0);
218+
batchCount = 0;
219+
statement.clearBatch();
220220
}
221221
}
222222

@@ -271,9 +271,10 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
271271
setRowToStatement(statement, fieldTypeList, rowValue, null);
272272
}
273273

274+
batchCount++;
274275
statement.addBatch();
275276

276-
if (batchCount.incrementAndGet() >= batchSize) {
277+
if (batchCount > batchSize) {
277278
flush();
278279
}
279280
} catch (Exception e) {
@@ -296,7 +297,7 @@ public void close() throws IOException {
296297
this.scheduler.shutdown();
297298
}
298299
// 将还未执行的SQL flush
299-
if (batchCount.get() > 0) {
300+
if (batchCount > 0) {
300301
try {
301302
flush();
302303
} catch (Exception e) {

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class ImpalaSink implements RetractStreamTableSink<Row>, IStreamSinkGener
6060
protected String principal;
6161

6262
protected int batchSize = 100;
63-
protected long batchWaitInterval = 10000;
63+
protected long batchWaitInterval = 60 * 1000L;
6464
protected String tableName;
6565
protected String registerTabName;
6666
protected String storeType;

localTest/src/main/java/com/dtstack/flink/sql/localTest/LocalTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static void main(String[] args) throws Exception {
5050
setLogLevel("INFO");
5151

5252
List<String> propertiesList = new ArrayList<>();
53-
String sqlPath = "//Users/wtz/dtstack/job/flinkStreamSQL/sql/impalaDemoThree.sql";
53+
String sqlPath = "/Users/wtz/dtstack/job/flinkStreamSQL/sql/impalaDemoThree.sql";
5454
Map<String, Object> conf = new HashMap<>();
5555
JSONObject properties = new JSONObject();
5656

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<module>oceanbase</module>
3838
<module>tidb</module>
3939
<module>kingbase</module>
40-
<!-- <module>localTest</module>-->
40+
<module>localTest</module>
4141

4242
</modules>
4343

0 commit comments

Comments
 (0)