Skip to content

Commit 46b7f16

Browse files
committed
[fix] add metric inc and meta data.
1 parent 83fc16b commit 46b7f16

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

aws/aws-sink/src/main/java/com/dtstack/flink/sql/sink/aws/AwsOutputFormat.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.amazonaws.services.s3.AmazonS3Client;
2222
import com.amazonaws.services.s3.model.AppendObjectRequest;
23+
import com.amazonaws.services.s3.model.ObjectMetadata;
2324
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2425
import com.dtstack.flink.sql.sink.aws.util.AwsManager;
2526
import org.apache.flink.api.java.tuple.Tuple2;
@@ -73,14 +74,18 @@ public void open(int taskNumber, int numTasks) throws IOException {
7374
@Override
7475
public void writeRecord(Tuple2 record) throws IOException {
7576
String recordStr = record.f1.toString() + LINE_BREAK;
77+
int length = recordStr.getBytes().length;
7678
inputStream = new ByteArrayInputStream(recordStr.getBytes());
79+
ObjectMetadata metadata = new ObjectMetadata();
80+
metadata.setContentLength(length);
7781
// 追加流式写入,但是这种情况下,可能会出现oom【因为数据都是缓存在内存中】
7882
AppendObjectRequest appendObjectRequest = new AppendObjectRequest(
79-
bucket, objectName, inputStream, null)
83+
bucket, objectName, inputStream, metadata)
8084
.withPosition(position);
8185

8286
client.appendObject(appendObjectRequest);
83-
position += recordStr.getBytes().length;
87+
position += length;
88+
outRecords.inc();
8489
}
8590

8691
@Override

0 commit comments

Comments
 (0)