Skip to content

Commit 01ff006

Browse files
增加elasticsearch6-side功能(未完成)
1 parent 43aaad0 commit 01ff006

File tree

7 files changed

+701
-0
lines changed

7 files changed

+701
-0
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.elasticsearch6</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.all.elasticsearch6</artifactId>
13+
<name>elasticsearch6-all-side</name>
14+
<packaging>jar</packaging>
15+
16+
<properties>
17+
<sql.side.elasticsearch6.core.version>1.0-SNAPSHOT</sql.side.elasticsearch6.core.version>
18+
<elasticsearch.version>6.8.6</elasticsearch.version>
19+
</properties>
20+
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.elasticsearch.client</groupId>
25+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
26+
<version>${elasticsearch.version}</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>com.dtstack.flink</groupId>
31+
<artifactId>sql.side.elasticsearch6.core</artifactId>
32+
<version>${sql.side.elasticsearch6.core.version}</version>
33+
</dependency>
34+
</dependencies>
35+
36+
<build>
37+
<plugins>
38+
<plugin>
39+
<groupId>org.apache.maven.plugins</groupId>
40+
<artifactId>maven-shade-plugin</artifactId>
41+
<version>1.4</version>
42+
<executions>
43+
<execution>
44+
<phase>package</phase>
45+
<goals>
46+
<goal>shade</goal>
47+
</goals>
48+
<configuration>
49+
<artifactSet>
50+
<excludes>
51+
52+
</excludes>
53+
</artifactSet>
54+
<filters>
55+
<filter>
56+
<artifact>*:*</artifact>
57+
<excludes>
58+
<exclude>META-INF/*.SF</exclude>
59+
<exclude>META-INF/*.DSA</exclude>
60+
<exclude>META-INF/*.RSA</exclude>
61+
</excludes>
62+
</filter>
63+
</filters>
64+
</configuration>
65+
</execution>
66+
</executions>
67+
</plugin>
68+
69+
<plugin>
70+
<artifactId>maven-antrun-plugin</artifactId>
71+
<version>1.2</version>
72+
<executions>
73+
<execution>
74+
<id>copy-resources</id>
75+
<!-- here the phase you need -->
76+
<phase>package</phase>
77+
<goals>
78+
<goal>run</goal>
79+
</goals>
80+
<configuration>
81+
<tasks>
82+
<copy todir="${basedir}/../../../plugins/elasticsearch6allside">
83+
<fileset dir="target/">
84+
<include name="${project.artifactId}-${project.version}.jar"/>
85+
</fileset>
86+
</copy>
87+
88+
<move file="${basedir}/../../../plugins/elasticsearch6allside/${project.artifactId}-${project.version}.jar"
89+
tofile="${basedir}/../../../plugins/elasticsearch6allside/${project.name}-${git.branch}.jar"/>
90+
</tasks>
91+
</configuration>
92+
</execution>
93+
</executions>
94+
</plugin>
95+
</plugins>
96+
</build>
97+
98+
99+
100+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.dtstack.flink.sql.side.elasticsearch6;
2+
3+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4+
5+
import com.dtstack.flink.sql.side.*;
6+
import com.dtstack.flink.sql.side.elasticsearch6.table.Elasticsearch6SideTableInfo;
7+
import com.dtstack.flink.sql.util.ParseUtils;
8+
import com.google.common.collect.Lists;
9+
import org.apache.calcite.sql.SqlNode;
10+
import org.apache.commons.collections.CollectionUtils;
11+
import org.apache.commons.lang3.StringUtils;
12+
import org.elasticsearch.search.builder.SearchSourceBuilder;
13+
14+
import java.util.Arrays;
15+
import java.util.List;
16+
import java.util.stream.Collectors;
17+
18+
/**
19+
* @author yinxi
20+
* @date 2020/1/13 - 1:01
21+
*/
22+
public class Elasticsearch6AllSideInfo extends SideInfo {
23+
24+
public Elasticsearch6AllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
25+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
26+
}
27+
28+
@Override
29+
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
30+
Elasticsearch6SideTableInfo es6SideTableInfo = (Elasticsearch6SideTableInfo) sideTableInfo;
31+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
32+
33+
34+
sqlCondition = getSelectFromStatement(getEstype(es6SideTableInfo), Arrays.asList(sideSelectFields.split(",")), sideTableInfo.getPredicateInfoes());
35+
System.out.println("-------- all side sql query-------\n" + sqlCondition);
36+
}
37+
38+
//基于rdb开发side,但是那些between,in,not in之类的不知道怎么处理
39+
40+
public String getAdditionalWhereClause() {
41+
return "";
42+
}
43+
44+
private String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
45+
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
46+
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
47+
String whereClause = buildWhereClause(predicateClause);
48+
String sql = "SELECT " + fromClause + " FROM " + tableName + whereClause;
49+
return sql;
50+
}
51+
52+
private String buildWhereClause(String predicateClause) {
53+
String additionalWhereClause = getAdditionalWhereClause();
54+
String whereClause = (!StringUtils.isEmpty(predicateClause) || !StringUtils.isEmpty(additionalWhereClause) ? " WHERE " + predicateClause : "");
55+
whereClause += (StringUtils.isEmpty(predicateClause)) ? additionalWhereClause.replaceFirst("AND", "") : additionalWhereClause;
56+
return whereClause;
57+
}
58+
59+
@Override
60+
public void parseSelectFields(JoinInfo joinInfo) {
61+
String sideTableName = joinInfo.getSideTableName();
62+
String nonSideTableName = joinInfo.getNonSideTable();
63+
List<String> fields = Lists.newArrayList();
64+
65+
int sideIndex = 0;
66+
for (int i = 0; i < outFieldInfoList.size(); i++) {
67+
FieldInfo fieldInfo = outFieldInfoList.get(i);
68+
if (fieldInfo.getTable().equalsIgnoreCase(sideTableName)) {
69+
fields.add(fieldInfo.getFieldName());
70+
sideFieldIndex.put(i, sideIndex);
71+
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
72+
sideIndex++;
73+
} else if (fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)) {
74+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
75+
inFieldIndex.put(i, nonSideIndex);
76+
} else {
77+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
78+
}
79+
}
80+
81+
if (fields.size() == 0) {
82+
throw new RuntimeException("select non field from table " + sideTableName);
83+
}
84+
85+
//add join on condition field to select fields
86+
SqlNode conditionNode = joinInfo.getCondition();
87+
88+
List<SqlNode> sqlNodeList = Lists.newArrayList();
89+
90+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
91+
92+
for (SqlNode sqlNode : sqlNodeList) {
93+
dealOneEqualCon(sqlNode, sideTableName);
94+
}
95+
96+
if (CollectionUtils.isEmpty(equalFieldList)) {
97+
throw new RuntimeException("no join condition found after table " + joinInfo.getLeftTableName());
98+
}
99+
100+
for (String equalField : equalFieldList) {
101+
if (fields.contains(equalField)) {
102+
continue;
103+
}
104+
105+
fields.add(equalField);
106+
}
107+
108+
sideSelectFields = String.join(",", fields);
109+
}
110+
111+
public String buildFilterCondition(PredicateInfo info) {
112+
switch (info.getOperatorKind()) {
113+
case "IN":
114+
case "NOT_IN":
115+
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName() + " ( " + info.getCondition() + " )";
116+
case "NOT_EQUALS":
117+
return quoteIdentifier(info.getFieldName()) + " != " + info.getCondition();
118+
case "BETWEEN":
119+
return quoteIdentifier(info.getFieldName()) + " BETWEEN " + info.getCondition();
120+
case "IS_NOT_NULL":
121+
case "IS_NULL":
122+
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName();
123+
default:
124+
return quoteIdentifier(info.getFieldName()) + " " + info.getOperatorName() + " " + info.getCondition();
125+
}
126+
}
127+
128+
public String getEstype(Elasticsearch6SideTableInfo es6SdideTableInfo) {
129+
return es6SdideTableInfo.getEsType();
130+
}
131+
132+
public String quoteIdentifier(String identifier) {
133+
return " " + identifier + " ";
134+
}
135+
}

0 commit comments

Comments
 (0)