Skip to content

Commit adcf847

Browse files
committed
oceanbase dimension
1 parent 201dd74 commit adcf847

File tree

19 files changed

+570
-32
lines changed

19 files changed

+570
-32
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,9 @@ public class ExecuteProcessHelper {
9494

9595

9696
public static ParamsInfo parseParams(String[] args) throws Exception {
97-
LOG.info("------------program params-------------------------");
9897
System.out.println("------------program params-------------------------");
9998
Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));
10099
Arrays.stream(args).forEach(System.out::println);
101-
LOG.info("-------------------------------------------");
102100
System.out.println("----------------------------------------");
103101

104102
OptionParser optionParser = new OptionParser(args);
@@ -288,9 +286,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
288286

289287
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
290288
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
291-
.map((Tuple2<Boolean, Row> f0) -> {
292-
return f0.f1;
293-
})
289+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
294290
.returns(typeInfo);
295291

296292
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void exec(String sql, Map<String, AbstractSideTableInfo> sideTableMap, St
121121
SideSQLParser sideSQLParser = new SideSQLParser();
122122
sideSQLParser.setLocalTableCache(localTableCache);
123123
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
124-
Object pollObj = null;
124+
Object pollObj;
125125

126126
//need clean
127127
boolean preIsSideJoin = false;

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,7 @@
4141
public class WaterMarkerAssigner {
4242

4343
public boolean checkNeedAssignWaterMarker(AbstractSourceTableInfo tableInfo){
44-
if(Strings.isNullOrEmpty(tableInfo.getEventTimeField())){
45-
return false;
46-
}
47-
48-
return true;
44+
return !Strings.isNullOrEmpty(tableInfo.getEventTimeField());
4945
}
5046

5147
public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo typeInfo, AbstractSourceTableInfo sourceTableInfo){

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.io.IOException;
4949
import java.io.InputStreamReader;
5050
import java.net.URLDecoder;
51+
import java.nio.charset.StandardCharsets;
5152
import java.util.LinkedList;
5253
import java.util.List;
5354
import java.util.Map;
@@ -67,8 +68,7 @@ public class LauncherMain {
6768

6869
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
6970
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
70-
String corePath = localSqlRootJar + SP + jarPath;
71-
return corePath;
71+
return localSqlRootJar + SP + jarPath;
7272
}
7373

7474
public static void main(String[] args) throws Exception {
@@ -85,14 +85,14 @@ public static void main(String[] args) throws Exception {
8585
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
8686

8787
if(mode.equals(ClusterMode.local.name())) {
88-
String[] localArgs = argList.toArray(new String[argList.size()]);
88+
String[] localArgs = argList.toArray(new String[0]);
8989
Main.main(localArgs);
9090
return;
9191
}
9292

9393
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
9494
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
95-
String[] remoteArgs = argList.toArray(new String[argList.size()]);
95+
String[] remoteArgs = argList.toArray(new String[0]);
9696
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
9797

9898
String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
@@ -116,14 +116,14 @@ public static void main(String[] args) throws Exception {
116116

117117
private static String[] parseJson(String[] args) {
118118
BufferedReader reader = null;
119-
String lastStr = "";
119+
StringBuilder lastStr = new StringBuilder();
120120
try{
121121
FileInputStream fileInputStream = new FileInputStream(args[0]);
122-
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");
122+
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, StandardCharsets.UTF_8);
123123
reader = new BufferedReader(inputStreamReader);
124124
String tempString = null;
125125
while((tempString = reader.readLine()) != null){
126-
lastStr += tempString;
126+
lastStr.append(tempString);
127127
}
128128
reader.close();
129129
}catch(IOException e){
@@ -137,14 +137,13 @@ private static String[] parseJson(String[] args) {
137137
}
138138
}
139139
}
140-
Map<String, Object> map = JSON.parseObject(lastStr, new TypeReference<Map<String, Object>>(){} );
140+
Map<String, Object> map = JSON.parseObject(lastStr.toString(), new TypeReference<Map<String, Object>>(){} );
141141
List<String> list = new LinkedList<>();
142142

143143
for (Map.Entry<String, Object> entry : map.entrySet()) {
144144
list.add("-" + entry.getKey());
145145
list.add(entry.getValue().toString());
146146
}
147-
String[] array = list.toArray(new String[list.size()]);
148-
return array;
147+
return list.toArray(new String[0]);
149148
}
150149
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.oceanbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.side.all.oceanbase</artifactId>
14+
<name>oceanbase-all-side</name>
15+
<packaging>jar</packaging>
16+
<version>1.0-SNAPSHOT</version>
17+
18+
<properties>
19+
<sql.side.oceanbase.core.version>1.0-SNAPSHOT</sql.side.oceanbase.core.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.dtstack.flink</groupId>
25+
<artifactId>sql.side.oceanbase.core</artifactId>
26+
<version>${sql.side.oceanbase.core.version}</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>1.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
<configuration>
43+
<createDependencyReducedPom>false</createDependencyReducedPom>
44+
<artifactSet>
45+
<excludes>
46+
47+
</excludes>
48+
</artifactSet>
49+
<filters>
50+
<filter>
51+
<artifact>*:*</artifact>
52+
<excludes>
53+
<exclude>META-INF/*.SF</exclude>
54+
<exclude>META-INF/*.DSA</exclude>
55+
<exclude>META-INF/*.RSA</exclude>
56+
</excludes>
57+
</filter>
58+
</filters>
59+
</configuration>
60+
</execution>
61+
</executions>
62+
</plugin>
63+
64+
<plugin>
65+
<artifactId>maven-antrun-plugin</artifactId>
66+
<version>1.2</version>
67+
<executions>
68+
<execution>
69+
<id>copy-resources</id>
70+
<!-- here the phase you need -->
71+
<phase>package</phase>
72+
<goals>
73+
<goal>run</goal>
74+
</goals>
75+
<configuration>
76+
<tasks>
77+
<copy todir="${basedir}/../../../plugins/oceanbaseallside">
78+
<fileset dir="target/">
79+
<include name="${project.artifactId}-${project.version}.jar"/>
80+
</fileset>
81+
</copy>
82+
83+
<move file="${basedir}/../../../plugins/oceanbaseallside/${project.artifactId}-${project.version}.jar"
84+
tofile="${basedir}/../../../plugins/oceanbaseallside/${project.name}-${git.branch}.jar"/>
85+
</tasks>
86+
</configuration>
87+
</execution>
88+
</executions>
89+
</plugin>
90+
</plugins>
91+
</build>
92+
93+
94+
</project>
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstatck.flink.sql.side.oceanbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import com.google.common.collect.Maps;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
/**
37+
* @author : tiezhu
38+
* @date : 2020/3/26
39+
*/
40+
public class OceanbaseAllReqRow extends AbstractRdbAllReqRow {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(OceanbaseAllReqRow.class);
43+
44+
private static final String OCEAN_BASE_DRIVER = "com.mysql.jdbc.Driver";
45+
46+
public OceanbaseAllReqRow(RowTypeInfo rowTypeInfo,
47+
JoinInfo joinInfo,
48+
List<FieldInfo> outFieldInfoList,
49+
AbstractSideTableInfo sideTableInfo) {
50+
super(new OceanbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
51+
}
52+
53+
@Override
54+
public Connection getConn(String dbUrl, String userName, String password) {
55+
try {
56+
Class.forName(OCEAN_BASE_DRIVER);
57+
Map<String, String> addParams = Maps.newHashMap();
58+
addParams.put("useCursorFetch", "true");
59+
String targetDbUrl = DtStringUtil.addJdbcParam(dbUrl, addParams, true);
60+
return DriverManager.getConnection(targetDbUrl, userName, password);
61+
} catch (Exception e) {
62+
LOG.error("oceanbase get connect error", e);
63+
throw new RuntimeException(e);
64+
}
65+
}
66+
67+
@Override
68+
public int getFetchSize() {
69+
return Integer.MIN_VALUE;
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstatck.flink.sql.side.oceanbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
27+
import java.util.List;
28+
29+
/**
30+
* @author : tiezhu
31+
* @date : 2020/3/26
32+
*/
33+
public class OceanbaseAllSideInfo extends RdbAllSideInfo {
34+
public OceanbaseAllSideInfo(RowTypeInfo rowTypeInfo,
35+
JoinInfo joinInfo,
36+
List<FieldInfo> outFieldInfoList,
37+
AbstractSideTableInfo sideTableInfo) {
38+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
39+
}
40+
}

0 commit comments

Comments
 (0)