Skip to content

Commit 7e12bc4

Browse files
author
yinxi
committed
Merge branch 'feat_elasticsearch6-sink_mergeTest' into '1.8_test_3.10.x'
添加elasticsearch6-sink到test分支 See merge request !234
2 parents 69f0d75 + ae504c5 commit 7e12bc4

File tree

37 files changed

+1207
-127
lines changed

37 files changed

+1207
-127
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
2127
import com.datastax.driver.core.Cluster;
2228
import com.datastax.driver.core.ConsistencyLevel;
2329
import com.datastax.driver.core.HostDistance;
@@ -33,15 +39,10 @@
3339
import com.dtstack.flink.sql.side.JoinInfo;
3440
import com.dtstack.flink.sql.side.SideTableInfo;
3541
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3942
import com.google.common.collect.Lists;
4043
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.runtime.types.CRow;
42-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
43-
import org.apache.flink.types.Row;
44-
import org.apache.flink.util.Collector;
44+
import org.apache.calcite.sql.JoinType;
45+
import org.apache.commons.collections.CollectionUtils;
4546
import org.slf4j.Logger;
4647
import org.slf4j.LoggerFactory;
4748

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
2229
import com.datastax.driver.core.Cluster;
2330
import com.datastax.driver.core.ConsistencyLevel;
2431
import com.datastax.driver.core.HostDistance;
@@ -38,25 +45,18 @@
3845
import com.dtstack.flink.sql.side.cache.CacheObj;
3946
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4047
import com.google.common.base.Function;
48+
import com.google.common.collect.Lists;
4149
import com.google.common.util.concurrent.AsyncFunction;
4250
import com.google.common.util.concurrent.FutureCallback;
4351
import com.google.common.util.concurrent.Futures;
4452
import com.google.common.util.concurrent.ListenableFuture;
4553
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.runtime.types.CRow;
51-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
52-
import org.apache.flink.types.Row;
5354
import org.slf4j.Logger;
5455
import org.slf4j.LoggerFactory;
5556

5657
import java.net.InetAddress;
5758
import java.sql.Timestamp;
5859
import java.util.ArrayList;
59-
import java.util.Collections;
6060
import java.util.List;
6161
import java.util.Map;
6262

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838

3939
package com.dtstack.flink.sql.sink.cassandra;
4040

41+
import org.apache.flink.api.common.typeinfo.TypeInformation;
42+
import org.apache.flink.api.java.tuple.Tuple;
43+
import org.apache.flink.api.java.tuple.Tuple2;
44+
import org.apache.flink.configuration.Configuration;
45+
import org.apache.flink.types.Row;
46+
4147
import com.datastax.driver.core.Cluster;
4248
import com.datastax.driver.core.ConsistencyLevel;
4349
import com.datastax.driver.core.HostDistance;
@@ -49,13 +55,9 @@
4955
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5056
import com.datastax.driver.core.policies.RetryPolicy;
5157
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple;
54-
import org.apache.flink.api.java.tuple.Tuple2;
55-
import org.apache.flink.configuration.Configuration;
56-
import org.apache.flink.types.Row;
5758
import org.slf4j.Logger;
5859
import org.slf4j.LoggerFactory;
60+
5961
import java.io.IOException;
6062
import java.net.InetAddress;
6163
import java.sql.DriverManager;

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@
1818

1919
package com.dtstack.flink.sql.environment;
2020

21-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22-
import com.dtstack.flink.sql.enums.EStateBackend;
23-
import com.dtstack.flink.sql.util.MathUtil;
24-
import com.dtstack.flink.sql.util.PropertiesUtils;
25-
import org.apache.commons.lang3.BooleanUtils;
26-
import org.apache.commons.lang3.StringUtils;
2721
import org.apache.flink.api.common.ExecutionConfig;
2822
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2923
import org.apache.flink.api.common.time.Time;
@@ -40,6 +34,13 @@
4034
import org.apache.flink.table.api.StreamQueryConfig;
4135
import org.apache.flink.table.api.java.StreamTableEnvironment;
4236

37+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
38+
import com.dtstack.flink.sql.enums.EStateBackend;
39+
import com.dtstack.flink.sql.util.MathUtil;
40+
import com.dtstack.flink.sql.util.PropertiesUtils;
41+
import org.apache.commons.lang3.BooleanUtils;
42+
import org.apache.commons.lang3.StringUtils;
43+
4344
import java.io.IOException;
4445
import java.lang.reflect.InvocationTargetException;
4546
import java.lang.reflect.Method;

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.java.tuple.Tuple2;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
import org.apache.flink.streaming.api.datastream.DataStream;
27+
import org.apache.flink.table.api.StreamQueryConfig;
28+
import org.apache.flink.table.api.Table;
29+
import org.apache.flink.table.api.TableSchema;
30+
import org.apache.flink.table.api.java.StreamTableEnvironment;
31+
import org.apache.flink.table.runtime.CRowKeySelector;
32+
import org.apache.flink.table.runtime.types.CRow;
33+
import org.apache.flink.table.runtime.types.CRowTypeInfo;
34+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
35+
import org.apache.flink.types.Row;
36+
2337
import com.dtstack.flink.sql.enums.ECacheType;
2438
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2539
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
@@ -49,19 +63,6 @@
4963
import org.apache.calcite.sql.parser.SqlParseException;
5064
import org.apache.calcite.sql.parser.SqlParserPos;
5165
import org.apache.commons.collections.CollectionUtils;
52-
import org.apache.flink.api.common.typeinfo.TypeInformation;
53-
import org.apache.flink.api.java.tuple.Tuple2;
54-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
55-
import org.apache.flink.streaming.api.datastream.DataStream;
56-
import org.apache.flink.table.api.StreamQueryConfig;
57-
import org.apache.flink.table.api.Table;
58-
import org.apache.flink.table.api.TableSchema;
59-
import org.apache.flink.table.api.java.StreamTableEnvironment;
60-
import org.apache.flink.table.runtime.CRowKeySelector;
61-
import org.apache.flink.table.runtime.types.CRow;
62-
import org.apache.flink.table.runtime.types.CRowTypeInfo;
63-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
64-
import org.apache.flink.types.Row;
6566
import org.slf4j.Logger;
6667
import org.slf4j.LoggerFactory;
6768

docs/elasticsearch6Sink.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
bb INT
6+
)WITH(
7+
type ='elasticsearch6',
8+
address ='ip:port[,ip:port]',
9+
cluster='clusterName',
10+
esType ='esType',
11+
index ='index',
12+
id ='num[,num]',
13+
authMesh = 'true',
14+
userName = 'userName',
15+
password = 'password',
16+
parallelism ='1'
17+
)
18+
```
19+
## 2.支持的版本
20+
elasticsearch 6.8.6
21+
22+
## 3.表结构定义
23+
24+
|参数名称|含义|
25+
|----|---|
26+
|tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称|
27+
|colName|列名称|
28+
|colType|列类型 [colType支持的类型](colType.md)|
29+
30+
## 4.参数:
31+
|参数名称|含义|是否必填|默认值|
32+
|----|---|---|----|
33+
|type|表明 输出表类型[elasticsearch6]|||
34+
|address | 连接ES Transport地址(tcp地址)|||
35+
|cluster | ES 集群名称 |||
36+
|index | 选择的ES上的index名称|||
37+
|esType | 选择ES上的type名称|||
38+
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id;|||
39+
| |若id为空字符串或索引都超出范围,则随机生成id值)|||
40+
|authMesh | 是否进行用户名密码认证 || false|
41+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
42+
|password | 密码 | 否,authMesh='true'时为必填 ||
43+
|parallelism | 并行度设置||1|
44+
45+
## 5.样例:
46+
```
47+
CREATE TABLE MyResult(
48+
aa INT,
49+
bb INT
50+
)WITH(
51+
type ='elasticsearch6',
52+
address ='172.16.10.47:9500',
53+
cluster='es_47_menghan',
54+
esType ='type1',
55+
index ='xc_es_test',
56+
authMesh = 'true',
57+
userName = 'elastic',
58+
password = 'abc123',
59+
id ='0,1',
60+
parallelism ='1'
61+
)
62+
```

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

23-
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24-
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
25-
import com.dtstack.flink.sql.table.TargetTableInfo;
26-
import org.apache.commons.lang3.StringUtils;
2723
import org.apache.flink.api.common.typeinfo.TypeInformation;
2824
import org.apache.flink.api.java.tuple.Tuple2;
2925
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -34,8 +30,14 @@
3430
import org.apache.flink.table.sinks.RetractStreamTableSink;
3531
import org.apache.flink.table.sinks.TableSink;
3632
import org.apache.flink.types.Row;
33+
34+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
35+
import com.dtstack.flink.sql.sink.elasticsearch.table.ElasticsearchTableInfo;
36+
import com.dtstack.flink.sql.table.TargetTableInfo;
37+
import org.apache.commons.lang3.StringUtils;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
40+
3941
import java.net.InetAddress;
4042
import java.net.InetSocketAddress;
4143
import java.util.ArrayList;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/EsUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21-
import com.dtstack.flink.sql.util.DtStringUtil;
2221
import org.apache.flink.types.Row;
2322
import org.apache.flink.util.Preconditions;
2423

24+
import com.dtstack.flink.sql.util.DtStringUtil;
25+
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.elasticsearch6</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.sink.elasticsearch6</artifactId>
13+
<name>elasticsearch6-sink</name>
14+
15+
<properties>
16+
<elasticsearch.version>6.8.6</elasticsearch.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.flink</groupId>
22+
<artifactId>flink-streaming-java_2.11</artifactId>
23+
<version>${flink.version}</version>
24+
<scope>provided</scope>
25+
</dependency>
26+
27+
<dependency>
28+
<groupId>org.elasticsearch.client</groupId>
29+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
30+
<version>${elasticsearch.version}</version>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>ch.qos.logback</groupId>
35+
<artifactId>logback-core</artifactId>
36+
<version>1.1.7</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>ch.qos.logback</groupId>
41+
<artifactId>logback-classic</artifactId>
42+
<version>1.1.7</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.apache.logging.log4j</groupId>
47+
<artifactId>log4j-to-slf4j</artifactId>
48+
<version>2.7</version>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
54+
<version>${flink.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
60+
<version>${flink.version}</version>
61+
</dependency>
62+
</dependencies>
63+
64+
<build>
65+
<plugins>
66+
<plugin>
67+
<groupId>org.apache.maven.plugins</groupId>
68+
<artifactId>maven-shade-plugin</artifactId>
69+
<version>1.4</version>
70+
<executions>
71+
<execution>
72+
<phase>package</phase>
73+
<goals>
74+
<goal>shade</goal>
75+
</goals>
76+
<configuration>
77+
<artifactSet>
78+
<excludes>
79+
<!--<exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>-->
80+
</excludes>
81+
</artifactSet>
82+
<filters>
83+
<filter>
84+
<artifact>*:*</artifact>
85+
<excludes>
86+
<exclude>META-INF/*.SF</exclude>
87+
<exclude>META-INF/*.DSA</exclude>
88+
<exclude>META-INF/*.RSA</exclude>
89+
</excludes>
90+
</filter>
91+
</filters>
92+
</configuration>
93+
</execution>
94+
</executions>
95+
</plugin>
96+
97+
<plugin>
98+
<artifactId>maven-antrun-plugin</artifactId>
99+
<version>1.2</version>
100+
<executions>
101+
<execution>
102+
<id>copy-resources</id>
103+
<!-- here the phase you need -->
104+
<phase>package</phase>
105+
<goals>
106+
<goal>run</goal>
107+
</goals>
108+
<configuration>
109+
<tasks>
110+
<copy todir="${basedir}/../../plugins/elasticsearch6sink">
111+
<fileset dir="target/">
112+
<include name="${project.artifactId}-${project.version}.jar"/>
113+
</fileset>
114+
</copy>
115+
116+
<move file="${basedir}/../../plugins/elasticsearch6sink/${project.artifactId}-${project.version}.jar"
117+
tofile="${basedir}/../../plugins/elasticsearch6sink/${project.name}-${git.branch}.jar"/>
118+
</tasks>
119+
</configuration>
120+
</execution>
121+
</executions>
122+
</plugin>
123+
</plugins>
124+
</build>
125+
126+
</project>

0 commit comments

Comments
 (0)