Skip to content

Commit e51b204

Browse files
author
gituser
committed
Merge branch 'feat_1.10.4.2.x_fileDirtyData' into 1.10_test_4.2.x
2 parents e5a7ccd + 7414cc3 commit e51b204

File tree

1 file changed

+22
-2
lines changed
  • file/file-source/src/main/java/com/dtstack/flink/sql/source/file

1 file changed

+22
-2
lines changed

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/FileSource.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.net.URI;
4747
import java.util.Locale;
4848
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.concurrent.atomic.AtomicInteger;
4950

5051
/**
5152
* @author tiezhu
@@ -71,6 +72,10 @@ public class FileSource implements IStreamSourceGener<Table>, SourceFunction<Row
7172

7273
private String charset;
7374

75+
private final AtomicInteger count = new AtomicInteger(0);
76+
77+
private final AtomicInteger errorCount = new AtomicInteger(0);
78+
7479
@Override
7580
public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo,
7681
StreamExecutionEnvironment env,
@@ -168,10 +173,19 @@ public void run(SourceContext<Row> ctx) throws Exception {
168173
inputStream.close();
169174
break;
170175
} else {
171-
Row row = deserializationSchema.deserialize(line.getBytes());
172-
ctx.collect(row);
176+
try {
177+
Row row = deserializationSchema.deserialize(line.getBytes());
178+
if (row == null) {
179+
throw new IOException("Deserialized row is null");
180+
}
181+
ctx.collect(row);
182+
count.incrementAndGet();
183+
} catch (IOException e) {
184+
errorCount.incrementAndGet();
185+
}
173186
}
174187
}
188+
printCount();
175189
}
176190

177191
@Override
@@ -185,5 +199,11 @@ public void cancel() {
185199
LOG.error("File input stream close error!");
186200
}
187201
}
202+
printCount();
203+
}
204+
205+
private void printCount() {
206+
LOG.info("Read count: {}", count.get());
207+
LOG.info("Error count: {}", errorCount.get());
188208
}
189209
}

0 commit comments

Comments
 (0)