Skip to content

Commit 00a032a

Browse files
committed
Merge branch 'feat_1.8_elasticsearch6-side' into 'v1.8.0_dev'
Feat_1.8_elasticsearch6-side elasticsearch6的维表部分 See merge request !222
2 parents dd35dda + db77f9a commit 00a032a

File tree

18 files changed

+2133
-2
lines changed

18 files changed

+2133
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ApiResult.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66

7-
import java.util.UUID;
8-
97
/**
108
* API调用结果返回
119
* Date: 2020/2/24

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
24+
import org.apache.calcite.sql.JoinType;
2425
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.table.runtime.types.CRow;
28+
import org.apache.flink.types.Row;
29+
import org.apache.flink.util.Collector;
2730

2831
import java.sql.SQLException;
2932
import java.util.concurrent.Executors;
@@ -64,4 +67,13 @@ public void open(Configuration parameters) throws Exception {
6467
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6568
}
6669

70+
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out){
71+
if(sideInput == null && sideInfo.getJoinType() != JoinType.LEFT){
72+
return;
73+
}
74+
75+
Row row = fillData(value.row(), sideInput);
76+
out.collect(new CRow(row, value.change()));
77+
}
78+
6779
}

docs/elasticsearch6Side

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
2+
## 1.格式:
3+
```
4+
CREATE TABLE tableName(
5+
colName cloType,
6+
...
7+
PRIMARY KEY(keyInfo),
8+
PERIOD FOR SYSTEM_TIME
9+
)WITH(
10+
type='elasticsearch6',
11+
address ='ip:port[,ip:port]',
12+
cluster='clusterName',
13+
estype ='esType',
14+
index ='index',
15+
authMesh='true',
16+
userName='dbUserName',
17+
password='dbPwd',
18+
cache ='LRU',
19+
cacheSize ='10000',
20+
cacheTTLMs ='60000',
21+
parallelism ='1',
22+
partitionedJoin='false'
23+
);
24+
```
25+
26+
# 2.支持版本
27+
mysql-5.6.35
28+
29+
## 3.表结构定义
30+
31+
|参数名称|含义|
32+
|----|---|
33+
| tableName | mysql表名称|
34+
| colName | 列名称|
35+
| colType | 列类型 [colType支持的类型](colType.md)|
36+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
37+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
38+
39+
## 4.参数
40+
41+
|参数名称|含义|是否必填|默认值|
42+
|----|---|---|----|
43+
type|表明 输出表类型[elasticsearch6]|是||
44+
|address | 连接ES Transport地址(tcp地址)|是||
45+
|cluster | ES 集群名称 |是||
46+
|index | 选择的ES上的index名称|否||
47+
|esType | 选择ES上的type名称|否||
48+
|authMesh | 是否进行用户名密码认证 | 否 | false|
49+
|userName | 用户名 | 否,authMesh='true'时为必填 ||
50+
|password | 密码 | 否,authMesh='true'时为必填 ||
51+
| cache | 维表缓存策略(NONE/LRU)|否|NONE|
52+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)|否|false|
53+
|parallelism | 并行度设置|否|1|
54+
55+
----------
56+
> 缓存策略
57+
* NONE: 不做内存缓存
58+
* LRU:
59+
* cacheSize: 缓存的条目数量
60+
* cacheTTLMs:缓存的过期时间(ms)
61+
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
62+
* asyncCapacity:异步请求容量,默认1000
63+
* asyncTimeout:异步请求超时时间,默认10000毫秒
64+
65+
## 5.样例
66+
```
67+
create table sideTable(
68+
channel varchar,
69+
xccount int,
70+
PRIMARY KEY(channel),
71+
PERIOD FOR SYSTEM_TIME
72+
)WITH(
73+
type ='elasticsearch6',
74+
address ='172.16.10.47:9500',
75+
cluster='es_47_menghan',
76+
estype ='type1',
77+
index ='xc_es_test',
78+
authMesh='true',
79+
userName='dtstack',
80+
password='abc123',
81+
cache ='LRU',
82+
cacheSize ='10000',
83+
cacheTTLMs ='60000',
84+
cacheMode='unordered',
85+
asyncCapacity='1000',
86+
asyncTimeout='10000'
87+
parallelism ='1',
88+
partitionedJoin='false'
89+
);
90+
91+
92+
```
93+
94+
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.elasticsearch6</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.all.elasticsearch6</artifactId>
13+
<name>elasticsearch6-all-side</name>
14+
<packaging>jar</packaging>
15+
16+
<properties>
17+
<sql.side.elasticsearch6.core.version>1.0-SNAPSHOT</sql.side.elasticsearch6.core.version>
18+
<elasticsearch.version>6.8.6</elasticsearch.version>
19+
</properties>
20+
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.elasticsearch.client</groupId>
25+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
26+
<version>${elasticsearch.version}</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>com.dtstack.flink</groupId>
31+
<artifactId>sql.side.elasticsearch6.core</artifactId>
32+
<version>${sql.side.elasticsearch6.core.version}</version>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>junit</groupId>
37+
<artifactId>junit</artifactId>
38+
<version>4.12</version>
39+
<scope>test</scope>
40+
</dependency>
41+
42+
</dependencies>
43+
44+
<build>
45+
<plugins>
46+
<plugin>
47+
<groupId>org.apache.maven.plugins</groupId>
48+
<artifactId>maven-shade-plugin</artifactId>
49+
<version>1.4</version>
50+
<executions>
51+
<execution>
52+
<phase>package</phase>
53+
<goals>
54+
<goal>shade</goal>
55+
</goals>
56+
<configuration>
57+
<artifactSet>
58+
<excludes>
59+
60+
</excludes>
61+
</artifactSet>
62+
<filters>
63+
<filter>
64+
<artifact>*:*</artifact>
65+
<excludes>
66+
<exclude>META-INF/*.SF</exclude>
67+
<exclude>META-INF/*.DSA</exclude>
68+
<exclude>META-INF/*.RSA</exclude>
69+
</excludes>
70+
</filter>
71+
</filters>
72+
</configuration>
73+
</execution>
74+
</executions>
75+
</plugin>
76+
77+
<plugin>
78+
<artifactId>maven-antrun-plugin</artifactId>
79+
<version>1.2</version>
80+
<executions>
81+
<execution>
82+
<id>copy-resources</id>
83+
<!-- here the phase you need -->
84+
<phase>package</phase>
85+
<goals>
86+
<goal>run</goal>
87+
</goals>
88+
<configuration>
89+
<tasks>
90+
<copy todir="${basedir}/../../../plugins/elasticsearch6allside">
91+
<fileset dir="target/">
92+
<include name="${project.artifactId}-${project.version}.jar"/>
93+
</fileset>
94+
</copy>
95+
96+
<move file="${basedir}/../../../plugins/elasticsearch6allside/${project.artifactId}-${project.version}.jar"
97+
tofile="${basedir}/../../../plugins/elasticsearch6allside/${project.name}-${git.branch}.jar"/>
98+
</tasks>
99+
</configuration>
100+
</execution>
101+
</executions>
102+
</plugin>
103+
</plugins>
104+
</build>
105+
106+
107+
108+
</project>

0 commit comments

Comments
 (0)