Skip to content

Commit 728f684

Browse files
committed
[feat-32933][core][aws] add aws sink plugins.
1 parent 030d8e1 commit 728f684

File tree

9 files changed

+820
-0
lines changed

9 files changed

+820
-0
lines changed

aws/aws-sink/pom.xml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.aws</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.aws</artifactId>
14+
<packaging>jar</packaging>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>com.cmcc</groupId>
19+
<artifactId>onest-s3-java-sdk</artifactId>
20+
<version>1.0</version>
21+
</dependency>
22+
</dependencies>
23+
24+
<build>
25+
<plugins>
26+
<plugin>
27+
<groupId>org.apache.maven.plugins</groupId>
28+
<artifactId>maven-shade-plugin</artifactId>
29+
<version>1.4</version>
30+
<executions>
31+
<execution>
32+
<phase>package</phase>
33+
<goals>
34+
<goal>shade</goal>
35+
</goals>
36+
<configuration>
37+
<createDependencyReducedPom>false</createDependencyReducedPom>
38+
<artifactSet>
39+
<excludes>
40+
41+
</excludes>
42+
</artifactSet>
43+
<filters>
44+
<filter>
45+
<artifact>*:*</artifact>
46+
<excludes>
47+
<exclude>META-INF/*.SF</exclude>
48+
<exclude>META-INF/*.DSA</exclude>
49+
<exclude>META-INF/*.RSA</exclude>
50+
</excludes>
51+
</filter>
52+
</filters>
53+
</configuration>
54+
</execution>
55+
</executions>
56+
</plugin>
57+
58+
<plugin>
59+
<artifactId>maven-antrun-plugin</artifactId>
60+
<version>1.2</version>
61+
<executions>
62+
<execution>
63+
<id>copy-resources</id>
64+
<!-- here the phase you need -->
65+
<phase>package</phase>
66+
<goals>
67+
<goal>run</goal>
68+
</goals>
69+
<configuration>
70+
<tasks>
71+
<copy todir="${basedir}/../../sqlplugins/awssink">
72+
<fileset dir="target/">
73+
<include name="${project.artifactId}-${project.version}.jar"/>
74+
</fileset>
75+
</copy>
76+
77+
<move file="${basedir}/../../sqlplugins/awssink/${project.artifactId}-${project.version}.jar"
78+
tofile="${basedir}/../../sqlplugins/awssink/${project.name}-${git.branch}.jar"/>
79+
</tasks>
80+
</configuration>
81+
</execution>
82+
</executions>
83+
</plugin>
84+
</plugins>
85+
</build>
86+
</project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.aws;
20+
21+
/**
22+
* @author tiezhu
23+
* date 2020/12/1
24+
* company dtstack
25+
*/
26+
public class AwsConstantKey {
27+
28+
public static final String PARALLELISM_KEY = "parallelism";
29+
30+
public static final String ACCESS_KEY = "accessKey";
31+
public static final String SECRET_KEY = "secretKey";
32+
public static final String BUCKET_KEY = "bucket";
33+
public static final String HOST_NAME = "hostname";
34+
public static final String STORAGE_TYPE = "storageType";
35+
public static final String BUCKET_ACL = "bucketAcl";
36+
public static final String OBJECT_NAME = "objectName";
37+
38+
public static final String PUBLIC_READ_WRITE = "public-read-write";
39+
public static final String PRIVATE = "private";
40+
public static final String PUBLIC_READ = "public-read";
41+
public static final String AUTHENTICATED_READ = "authenticated-read";
42+
public static final String LOG_DELIVERY_READ = "log-delivery-write";
43+
public static final String BUCKET_OWNER_READER = "bucket-owner-read";
44+
public static final String BUCKET_OWNER_FULL_CONTROL = "bucket-owner-full-control";
45+
46+
public static final String STANDARD = "standard";
47+
public static final String STANDARD_IA = "standard_ia";
48+
public static final String REDUCED_REDUNDANCY = "reduced_redundancy";
49+
public static final String GLACIER = "glacier";
50+
51+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.aws;
20+
21+
import com.amazonaws.services.s3.AmazonS3Client;
22+
import com.amazonaws.services.s3.model.AppendObjectRequest;
23+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
24+
import com.dtstack.flink.sql.sink.aws.util.AwsManager;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.util.Objects;
34+
35+
/**
36+
* @author tiezhu
37+
* date 2020/12/1
38+
* company dtstack
39+
*/
40+
public class AwsOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(AwsOutputFormat.class);
43+
44+
private static final String LINE_BREAK = "\n";
45+
46+
private transient AmazonS3Client client;
47+
48+
private String accessKey;
49+
private String secretKey;
50+
private String bucket;
51+
private String bucketAcl;
52+
private String objectName;
53+
private String hostname;
54+
private InputStream inputStream;
55+
private Long position = 0L;
56+
57+
private AwsOutputFormat() {
58+
}
59+
60+
@Override
61+
public void configure(Configuration parameters) {
62+
LOG.warn("--- configure client ---");
63+
client = AwsManager.initClient(accessKey, secretKey, hostname);
64+
}
65+
66+
@Override
67+
public void open(int taskNumber, int numTasks) throws IOException {
68+
LOG.warn("--- open ---");
69+
initMetric();
70+
position = AwsManager.getObjectPosition(bucket, objectName, client);
71+
}
72+
73+
@Override
74+
public void writeRecord(Tuple2 record) throws IOException {
75+
String recordStr = record.f1.toString() + LINE_BREAK;
76+
inputStream = new ByteArrayInputStream(recordStr.getBytes());
77+
// 追加流式写入,但是这种情况下,可能会出现oom【因为数据都是缓存在内存中】
78+
AppendObjectRequest appendObjectRequest = new AppendObjectRequest(
79+
bucket, objectName, inputStream, null)
80+
.withPosition(position);
81+
82+
client.appendObject(appendObjectRequest);
83+
position += recordStr.getBytes().length;
84+
}
85+
86+
@Override
87+
public void close() throws IOException {
88+
if (Objects.nonNull(inputStream)) {
89+
inputStream.close();
90+
}
91+
92+
if (Objects.nonNull(client)) {
93+
client.shutdown();
94+
}
95+
}
96+
97+
public static AwsOutputFormatBuilder buildS3OutputFormat() {
98+
return new AwsOutputFormatBuilder();
99+
}
100+
101+
public static class AwsOutputFormatBuilder {
102+
private final AwsOutputFormat awsOutputFormat;
103+
104+
private AwsOutputFormatBuilder() {
105+
awsOutputFormat = new AwsOutputFormat();
106+
}
107+
108+
public AwsOutputFormatBuilder setAccessKey(String accessKey) {
109+
awsOutputFormat.accessKey = accessKey;
110+
return this;
111+
}
112+
113+
public AwsOutputFormatBuilder setSecretKey(String secretKey) {
114+
awsOutputFormat.secretKey = secretKey;
115+
return this;
116+
}
117+
118+
public AwsOutputFormatBuilder setBucket(String bucket) {
119+
awsOutputFormat.bucket = bucket;
120+
return this;
121+
}
122+
123+
public AwsOutputFormatBuilder setBucketAcl(String bucketAcl) {
124+
awsOutputFormat.bucketAcl = bucketAcl;
125+
return this;
126+
}
127+
128+
public AwsOutputFormatBuilder setHostname(String hostname) {
129+
awsOutputFormat.hostname = hostname;
130+
return this;
131+
}
132+
133+
public AwsOutputFormatBuilder setObjectName(String objectName) {
134+
awsOutputFormat.objectName = objectName;
135+
return this;
136+
}
137+
138+
public AwsOutputFormat finish() {
139+
return awsOutputFormat;
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)