Skip to content

Commit 5ee1773

Browse files
committed
Merge branch 'feat_1.10_4.0.x_kingbaseES' into 'v1.10.0_dev'
Feat 1.10 4.0.x kingbase See merge request dt-insight-engine/flinkStreamSQL!89
2 parents 6a09b64 + b932018 commit 5ee1773

File tree

23 files changed

+839
-15
lines changed

23 files changed

+839
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
317317
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
318318
pluginClassPathSets.add(sinkTablePathUrl);
319319
} else if (tableInfo instanceof AbstractSideTableInfo) {
320-
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
320+
String sideOperator = ECacheType.ALL.name().equalsIgnoreCase(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
321321
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);
322322

323323
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,7 @@ private void parseTree(JsonNode jsonNode, String prefix) {
108108
String nodeKey = getNodeKey(prefix, next);
109109

110110
nodeAndJsonNodeMapping.put(nodeKey, child);
111-
if (child.isArray()) {
112-
parseTree(child, nodeKey);
113-
} else {
114-
parseTree(child, nodeKey);
115-
}
111+
parseTree(child, nodeKey);
116112
}
117113
}
118114

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.table;
2220

2321
import com.google.common.collect.Lists;
@@ -178,9 +176,9 @@ public void setPhysicalFields(Map<String, String> physicalFields) {
178176
}
179177

180178
public void finish(){
181-
this.fields = fieldList.toArray(new String[fieldList.size()]);
182-
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);
183-
this.fieldTypes = fieldTypeList.toArray(new String[fieldTypeList.size()]);
179+
this.fields = fieldList.toArray(new String[0]);
180+
this.fieldClasses = fieldClassList.toArray(new Class[0]);
181+
this.fieldTypes = fieldTypeList.toArray(new String[0]);
184182
}
185183

186184
/**

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,22 @@ public static Date localDateTimetoDate(LocalDateTime localDateTime){
108108
}
109109

110110
public static LocalDateTime dateToLocalDateTime(Date date){
111+
date = transformSqlDateToUtilDate(date);
111112
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
112113
}
113114

115+
/**
116+
* 将java.sql.Date 转化为 java.util.Date
117+
* @param date 不知道是java.sql.Date 还是 java.util.Date
118+
* @return 最后返回 java.util.Date
119+
*/
120+
public static Date transformSqlDateToUtilDate(Date date) {
121+
if (date instanceof java.sql.Date) {
122+
date = new Date(date.getTime());
123+
}
124+
return date;
125+
}
126+
114127
/**
115128
*
116129
*

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/CustomerSinkFunc.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.elasticsearch;
2020

21+
import com.dtstack.flink.sql.util.DateUtil;
2122
import org.apache.flink.api.common.functions.RuntimeContext;
2223
import org.apache.flink.api.java.tuple.Tuple2;
2324
import org.apache.flink.metrics.Counter;
@@ -31,6 +32,7 @@
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

35+
import java.sql.Date;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.stream.Collectors;
@@ -42,7 +44,9 @@
4244
public class CustomerSinkFunc implements ElasticsearchSinkFunction<Tuple2> {
4345

4446
private final Logger logger = LoggerFactory.getLogger(CustomerSinkFunc.class);
45-
/** 用作ID的属性值连接符号 */
47+
/**
48+
* 用作ID的属性值连接符号
49+
*/
4650
private static final String ID_VALUE_SPLIT = "_";
4751

4852
private String index;
@@ -107,6 +111,10 @@ private IndexRequest createIndexRequest(Row element) {
107111
Map<String, Object> dataMap = Es6Util.rowToJsonMap(element, fieldNames, fieldTypes);
108112
int length = Math.min(element.getArity(), fieldNames.size());
109113
for (int i = 0; i < length; i++) {
114+
if (element.getField(i) instanceof Date) {
115+
dataMap.put(fieldNames.get(i), DateUtil.transformSqlDateToUtilDate((Date) element.getField(i)));
116+
continue;
117+
}
110118
dataMap.put(fieldNames.get(i), element.getField(i));
111119
}
112120

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.kingbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
<name>kingbase-all-side</name>
13+
<packaging>jar</packaging>
14+
<artifactId>sql.side.all.kingbase</artifactId>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>com.dtstack.flink</groupId>
19+
<artifactId>sql.side.kingbase.core</artifactId>
20+
<version>1.0-SNAPSHOT</version>
21+
</dependency>
22+
</dependencies>
23+
24+
<build>
25+
<plugins>
26+
<plugin>
27+
<groupId>org.apache.maven.plugins</groupId>
28+
<artifactId>maven-shade-plugin</artifactId>
29+
<version>3.2.1</version>
30+
<executions>
31+
<execution>
32+
<phase>package</phase>
33+
<goals>
34+
<goal>shade</goal>
35+
</goals>
36+
<configuration>
37+
<createDependencyReducedPom>false</createDependencyReducedPom>
38+
<artifactSet>
39+
<excludes>
40+
41+
</excludes>
42+
</artifactSet>
43+
<filters>
44+
<filter>
45+
<artifact>*:*</artifact>
46+
<excludes>
47+
<exclude>META-INF/*.SF</exclude>
48+
<exclude>META-INF/*.DSA</exclude>
49+
<exclude>META-INF/*.RSA</exclude>
50+
</excludes>
51+
</filter>
52+
</filters>
53+
</configuration>
54+
</execution>
55+
</executions>
56+
</plugin>
57+
58+
<plugin>
59+
<artifactId>maven-antrun-plugin</artifactId>
60+
<version>1.2</version>
61+
<executions>
62+
<execution>
63+
<id>copy-resources</id>
64+
<!-- here the phase you need -->
65+
<phase>package</phase>
66+
<goals>
67+
<goal>run</goal>
68+
</goals>
69+
<configuration>
70+
<tasks>
71+
<copy todir="${basedir}/../../../sqlplugins/kingbaseallside">
72+
<fileset dir="target/">
73+
<include name="${project.artifactId}-${project.version}.jar"/>
74+
</fileset>
75+
</copy>
76+
77+
<move file="${basedir}/../../../sqlplugins/kingbaseallside/${project.artifactId}-${project.version}.jar"
78+
tofile="${basedir}/../../../sqlplugins/kingbaseallside/${project.name}-${git.branch}.jar"/>
79+
</tasks>
80+
</configuration>
81+
</execution>
82+
</executions>
83+
</plugin>
84+
</plugins>
85+
</build>
86+
87+
</project>
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
/**
37+
* Date: 2020/9/11
38+
* Company: www.dtstack.com
39+
*
40+
* @author tiezhu
41+
*/
42+
public class KingbaseAllReqRow extends AbstractRdbAllReqRow {
43+
44+
private static final long serialVersionUID = 2021683212163965319L;
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(KingbaseAllReqRow.class);
47+
48+
private static final String KINGBASE_DRIVER = "com.kingbase8.Driver";
49+
50+
public KingbaseAllReqRow(RowTypeInfo rowTypeInfo,
51+
JoinInfo joinInfo,
52+
List<FieldInfo> outFieldInfoList,
53+
AbstractSideTableInfo sideTableInfo) {
54+
super(new KingbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
55+
}
56+
57+
@Override
58+
public Connection getConn(String dbUrl, String userName, String password) {
59+
try {
60+
Class.forName(KINGBASE_DRIVER);
61+
//add param useCursorFetch=true
62+
Map<String, String> addParams = Maps.newHashMap();
63+
addParams.put("useCursorFetch", "true");
64+
String targetDbUrl = DtStringUtil.addJdbcParam(dbUrl, addParams, true);
65+
return DriverManager.getConnection(targetDbUrl, userName, password);
66+
} catch (Exception e) {
67+
LOG.error("kingbase get connection error", e);
68+
throw new RuntimeException("kingbase get connect error", e);
69+
}
70+
}
71+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
29+
import java.util.List;
30+
31+
/**
32+
* Date: 2020/9/11
33+
* Company: www.dtstack.com
34+
*
35+
* @author tiezhu
36+
*/
37+
public class KingbaseAllSideInfo extends RdbAllSideInfo {
38+
39+
private static final long serialVersionUID = 3486920874840522682L;
40+
41+
public KingbaseAllSideInfo(RowTypeInfo rowTypeInfo,
42+
JoinInfo joinInfo,
43+
List<FieldInfo> outFieldInfoList,
44+
AbstractSideTableInfo sideTableInfo) {
45+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
46+
}
47+
48+
@Override
49+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
50+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
51+
}
52+
}

0 commit comments

Comments
 (0)