Skip to content

Commit 10c01d1

Browse files
committed
Merge branch '1.10_release_4.0.x' into 1.10_release
# Conflicts: # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java
2 parents 90b0b2f + afcf1b3 commit 10c01d1

File tree

56 files changed

+2137
-285
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2137
-285
lines changed

aws/aws-sink/pom.xml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.aws</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.aws</artifactId>
14+
<packaging>jar</packaging>
15+
16+
<name>aws-sink</name>
17+
<url>http://maven.apache.org</url>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>com.cmcc</groupId>
22+
<artifactId>onest-s3-java-sdk</artifactId>
23+
<version>1.0</version>
24+
</dependency>
25+
</dependencies>
26+
27+
<build>
28+
<plugins>
29+
<plugin>
30+
<groupId>org.apache.maven.plugins</groupId>
31+
<artifactId>maven-shade-plugin</artifactId>
32+
<version>1.4</version>
33+
<executions>
34+
<execution>
35+
<phase>package</phase>
36+
<goals>
37+
<goal>shade</goal>
38+
</goals>
39+
<configuration>
40+
<createDependencyReducedPom>false</createDependencyReducedPom>
41+
<artifactSet>
42+
<excludes>
43+
44+
</excludes>
45+
</artifactSet>
46+
<filters>
47+
<filter>
48+
<artifact>*:*</artifact>
49+
<excludes>
50+
<exclude>META-INF/*.SF</exclude>
51+
<exclude>META-INF/*.DSA</exclude>
52+
<exclude>META-INF/*.RSA</exclude>
53+
</excludes>
54+
</filter>
55+
</filters>
56+
</configuration>
57+
</execution>
58+
</executions>
59+
</plugin>
60+
61+
<plugin>
62+
<artifactId>maven-antrun-plugin</artifactId>
63+
<version>1.2</version>
64+
<executions>
65+
<execution>
66+
<id>copy-resources</id>
67+
<!-- here the phase you need -->
68+
<phase>package</phase>
69+
<goals>
70+
<goal>run</goal>
71+
</goals>
72+
<configuration>
73+
<tasks>
74+
<copy todir="${basedir}/../../sqlplugins/awssink">
75+
<fileset dir="target/">
76+
<include name="${project.artifactId}-${project.version}.jar"/>
77+
</fileset>
78+
</copy>
79+
80+
<move file="${basedir}/../../sqlplugins/awssink/${project.artifactId}-${project.version}.jar"
81+
tofile="${basedir}/../../sqlplugins/awssink/${project.name}-${git.branch}.jar"/>
82+
</tasks>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
89+
</project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.aws;
20+
21+
/**
22+
* @author tiezhu
23+
* date 2020/12/1
24+
* company dtstack
25+
*/
26+
public class AwsConstantKey {
27+
28+
public static final String PARALLELISM_KEY = "parallelism";
29+
30+
public static final String ACCESS_KEY = "accessKey";
31+
public static final String SECRET_KEY = "secretKey";
32+
public static final String BUCKET_KEY = "bucket";
33+
public static final String HOST_NAME = "hostname";
34+
public static final String STORAGE_TYPE = "storageType";
35+
public static final String BUCKET_ACL = "bucketAcl";
36+
public static final String OBJECT_NAME = "objectName";
37+
38+
public static final String PUBLIC_READ_WRITE = "public-read-write";
39+
public static final String PRIVATE = "private";
40+
public static final String PUBLIC_READ = "public-read";
41+
public static final String AUTHENTICATED_READ = "authenticated-read";
42+
public static final String LOG_DELIVERY_READ = "log-delivery-write";
43+
public static final String BUCKET_OWNER_READER = "bucket-owner-read";
44+
public static final String BUCKET_OWNER_FULL_CONTROL = "bucket-owner-full-control";
45+
46+
public static final String STANDARD = "standard";
47+
public static final String STANDARD_IA = "standard_ia";
48+
public static final String REDUCED_REDUNDANCY = "reduced_redundancy";
49+
public static final String GLACIER = "glacier";
50+
51+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.aws;
20+
21+
import com.amazonaws.services.s3.AmazonS3Client;
22+
import com.amazonaws.services.s3.model.AppendObjectRequest;
23+
import com.amazonaws.services.s3.model.ObjectMetadata;
24+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
25+
import com.dtstack.flink.sql.sink.aws.util.AwsManager;
26+
import org.apache.flink.api.java.tuple.Tuple2;
27+
import org.apache.flink.configuration.Configuration;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.io.ByteArrayInputStream;
32+
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.util.Objects;
35+
36+
/**
37+
* @author tiezhu
38+
* date 2020/12/1
39+
* company dtstack
40+
*/
41+
public class AwsOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(AwsOutputFormat.class);
44+
45+
private static final String LINE_BREAK = "\n";
46+
47+
private transient AmazonS3Client client;
48+
49+
private String accessKey;
50+
private String secretKey;
51+
private String bucket;
52+
private String bucketAcl;
53+
private String objectName;
54+
private String hostname;
55+
private InputStream inputStream;
56+
private Long position = 0L;
57+
58+
private AwsOutputFormat() {
59+
}
60+
61+
@Override
62+
public void configure(Configuration parameters) {
63+
LOG.warn("--- configure client ---");
64+
client = AwsManager.initClient(accessKey, secretKey, hostname);
65+
}
66+
67+
@Override
68+
public void open(int taskNumber, int numTasks) throws IOException {
69+
LOG.warn("--- open ---");
70+
initMetric();
71+
position = AwsManager.getObjectPosition(bucket, objectName, client);
72+
}
73+
74+
@Override
75+
public void writeRecord(Tuple2 record) throws IOException {
76+
String recordStr = record.f1.toString() + LINE_BREAK;
77+
int length = recordStr.getBytes().length;
78+
inputStream = new ByteArrayInputStream(recordStr.getBytes());
79+
ObjectMetadata metadata = new ObjectMetadata();
80+
metadata.setContentLength(length);
81+
// 追加流式写入,但是这种情况下,可能会出现oom【因为数据都是缓存在内存中】
82+
AppendObjectRequest appendObjectRequest = new AppendObjectRequest(
83+
bucket, objectName, inputStream, metadata)
84+
.withPosition(position);
85+
86+
client.appendObject(appendObjectRequest);
87+
position += length;
88+
outRecords.inc();
89+
}
90+
91+
@Override
92+
public void close() throws IOException {
93+
if (Objects.nonNull(inputStream)) {
94+
inputStream.close();
95+
}
96+
97+
if (Objects.nonNull(client)) {
98+
client.shutdown();
99+
}
100+
}
101+
102+
public static AwsOutputFormatBuilder buildS3OutputFormat() {
103+
return new AwsOutputFormatBuilder();
104+
}
105+
106+
public static class AwsOutputFormatBuilder {
107+
private final AwsOutputFormat awsOutputFormat;
108+
109+
private AwsOutputFormatBuilder() {
110+
awsOutputFormat = new AwsOutputFormat();
111+
}
112+
113+
public AwsOutputFormatBuilder setAccessKey(String accessKey) {
114+
awsOutputFormat.accessKey = accessKey;
115+
return this;
116+
}
117+
118+
public AwsOutputFormatBuilder setSecretKey(String secretKey) {
119+
awsOutputFormat.secretKey = secretKey;
120+
return this;
121+
}
122+
123+
public AwsOutputFormatBuilder setBucket(String bucket) {
124+
awsOutputFormat.bucket = bucket;
125+
return this;
126+
}
127+
128+
public AwsOutputFormatBuilder setBucketAcl(String bucketAcl) {
129+
awsOutputFormat.bucketAcl = bucketAcl;
130+
return this;
131+
}
132+
133+
public AwsOutputFormatBuilder setHostname(String hostname) {
134+
awsOutputFormat.hostname = hostname;
135+
return this;
136+
}
137+
138+
public AwsOutputFormatBuilder setObjectName(String objectName) {
139+
awsOutputFormat.objectName = objectName;
140+
return this;
141+
}
142+
143+
public AwsOutputFormat finish() {
144+
return awsOutputFormat;
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)