Skip to content

Commit d0b5ef5

Browse files
committed
Merge branch 'feat_1.10_4.0.x_s3SinkAndSource' into '1.10_test_4.0.x'
[feat]增加s3结果表 See merge request dt-insight-engine/flinkStreamSQL!194
2 parents ac0979d + 83fc16b commit d0b5ef5

File tree

69 files changed

+2620
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+2620
-299
lines changed

aws/aws-sink/pom.xml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.aws</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.aws</artifactId>
14+
<packaging>jar</packaging>
15+
16+
<name>aws-sink</name>
17+
<url>http://maven.apache.org</url>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>com.cmcc</groupId>
22+
<artifactId>onest-s3-java-sdk</artifactId>
23+
<version>1.0</version>
24+
</dependency>
25+
</dependencies>
26+
27+
<build>
28+
<plugins>
29+
<plugin>
30+
<groupId>org.apache.maven.plugins</groupId>
31+
<artifactId>maven-shade-plugin</artifactId>
32+
<version>1.4</version>
33+
<executions>
34+
<execution>
35+
<phase>package</phase>
36+
<goals>
37+
<goal>shade</goal>
38+
</goals>
39+
<configuration>
40+
<createDependencyReducedPom>false</createDependencyReducedPom>
41+
<artifactSet>
42+
<excludes>
43+
44+
</excludes>
45+
</artifactSet>
46+
<filters>
47+
<filter>
48+
<artifact>*:*</artifact>
49+
<excludes>
50+
<exclude>META-INF/*.SF</exclude>
51+
<exclude>META-INF/*.DSA</exclude>
52+
<exclude>META-INF/*.RSA</exclude>
53+
</excludes>
54+
</filter>
55+
</filters>
56+
</configuration>
57+
</execution>
58+
</executions>
59+
</plugin>
60+
61+
<plugin>
62+
<artifactId>maven-antrun-plugin</artifactId>
63+
<version>1.2</version>
64+
<executions>
65+
<execution>
66+
<id>copy-resources</id>
67+
<!-- here the phase you need -->
68+
<phase>package</phase>
69+
<goals>
70+
<goal>run</goal>
71+
</goals>
72+
<configuration>
73+
<tasks>
74+
<copy todir="${basedir}/../../sqlplugins/awssink">
75+
<fileset dir="target/">
76+
<include name="${project.artifactId}-${project.version}.jar"/>
77+
</fileset>
78+
</copy>
79+
80+
<move file="${basedir}/../../sqlplugins/awssink/${project.artifactId}-${project.version}.jar"
81+
tofile="${basedir}/../../sqlplugins/awssink/${project.name}-${git.branch}.jar"/>
82+
</tasks>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
89+
</project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.aws;
20+
21+
/**
22+
* @author tiezhu
23+
* date 2020/12/1
24+
* company dtstack
25+
*/
26+
public class AwsConstantKey {
27+
28+
public static final String PARALLELISM_KEY = "parallelism";
29+
30+
public static final String ACCESS_KEY = "accessKey";
31+
public static final String SECRET_KEY = "secretKey";
32+
public static final String BUCKET_KEY = "bucket";
33+
public static final String HOST_NAME = "hostname";
34+
public static final String STORAGE_TYPE = "storageType";
35+
public static final String BUCKET_ACL = "bucketAcl";
36+
public static final String OBJECT_NAME = "objectName";
37+
38+
public static final String PUBLIC_READ_WRITE = "public-read-write";
39+
public static final String PRIVATE = "private";
40+
public static final String PUBLIC_READ = "public-read";
41+
public static final String AUTHENTICATED_READ = "authenticated-read";
42+
public static final String LOG_DELIVERY_READ = "log-delivery-write";
43+
public static final String BUCKET_OWNER_READER = "bucket-owner-read";
44+
public static final String BUCKET_OWNER_FULL_CONTROL = "bucket-owner-full-control";
45+
46+
public static final String STANDARD = "standard";
47+
public static final String STANDARD_IA = "standard_ia";
48+
public static final String REDUCED_REDUNDANCY = "reduced_redundancy";
49+
public static final String GLACIER = "glacier";
50+
51+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.aws;
20+
21+
import com.amazonaws.services.s3.AmazonS3Client;
22+
import com.amazonaws.services.s3.model.AppendObjectRequest;
23+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
24+
import com.dtstack.flink.sql.sink.aws.util.AwsManager;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.util.Objects;
34+
35+
/**
36+
* @author tiezhu
37+
* date 2020/12/1
38+
* company dtstack
39+
*/
40+
public class AwsOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(AwsOutputFormat.class);
43+
44+
private static final String LINE_BREAK = "\n";
45+
46+
private transient AmazonS3Client client;
47+
48+
private String accessKey;
49+
private String secretKey;
50+
private String bucket;
51+
private String bucketAcl;
52+
private String objectName;
53+
private String hostname;
54+
private InputStream inputStream;
55+
private Long position = 0L;
56+
57+
private AwsOutputFormat() {
58+
}
59+
60+
@Override
61+
public void configure(Configuration parameters) {
62+
LOG.warn("--- configure client ---");
63+
client = AwsManager.initClient(accessKey, secretKey, hostname);
64+
}
65+
66+
@Override
67+
public void open(int taskNumber, int numTasks) throws IOException {
68+
LOG.warn("--- open ---");
69+
initMetric();
70+
position = AwsManager.getObjectPosition(bucket, objectName, client);
71+
}
72+
73+
@Override
74+
public void writeRecord(Tuple2 record) throws IOException {
75+
String recordStr = record.f1.toString() + LINE_BREAK;
76+
inputStream = new ByteArrayInputStream(recordStr.getBytes());
77+
// 追加流式写入,但是这种情况下,可能会出现oom【因为数据都是缓存在内存中】
78+
AppendObjectRequest appendObjectRequest = new AppendObjectRequest(
79+
bucket, objectName, inputStream, null)
80+
.withPosition(position);
81+
82+
client.appendObject(appendObjectRequest);
83+
position += recordStr.getBytes().length;
84+
}
85+
86+
@Override
87+
public void close() throws IOException {
88+
if (Objects.nonNull(inputStream)) {
89+
inputStream.close();
90+
}
91+
92+
if (Objects.nonNull(client)) {
93+
client.shutdown();
94+
}
95+
}
96+
97+
public static AwsOutputFormatBuilder buildS3OutputFormat() {
98+
return new AwsOutputFormatBuilder();
99+
}
100+
101+
public static class AwsOutputFormatBuilder {
102+
private final AwsOutputFormat awsOutputFormat;
103+
104+
private AwsOutputFormatBuilder() {
105+
awsOutputFormat = new AwsOutputFormat();
106+
}
107+
108+
public AwsOutputFormatBuilder setAccessKey(String accessKey) {
109+
awsOutputFormat.accessKey = accessKey;
110+
return this;
111+
}
112+
113+
public AwsOutputFormatBuilder setSecretKey(String secretKey) {
114+
awsOutputFormat.secretKey = secretKey;
115+
return this;
116+
}
117+
118+
public AwsOutputFormatBuilder setBucket(String bucket) {
119+
awsOutputFormat.bucket = bucket;
120+
return this;
121+
}
122+
123+
public AwsOutputFormatBuilder setBucketAcl(String bucketAcl) {
124+
awsOutputFormat.bucketAcl = bucketAcl;
125+
return this;
126+
}
127+
128+
public AwsOutputFormatBuilder setHostname(String hostname) {
129+
awsOutputFormat.hostname = hostname;
130+
return this;
131+
}
132+
133+
public AwsOutputFormatBuilder setObjectName(String objectName) {
134+
awsOutputFormat.objectName = objectName;
135+
return this;
136+
}
137+
138+
public AwsOutputFormat finish() {
139+
return awsOutputFormat;
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)