Skip to content

Commit a0ad099

Browse files
committed
[feat] add kingbase source
1 parent 7f96408 commit a0ad099

File tree

13 files changed

+376
-17
lines changed

13 files changed

+376
-17
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.kingbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.all.kingbase</artifactId>
13+
14+
15+
</project>
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
25+
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
/**
37+
* Date: 2020/9/11
38+
* Company: www.dtstack.com
39+
*
40+
* @author tiezhu
41+
*/
42+
public class KingbaseAllReqRow extends AbstractRdbAllReqRow {
43+
44+
private static final long serialVersionUID = 2021683212163965319L;
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(KingbaseAllReqRow.class);
47+
48+
private static final String KINGBASE_DRIVER = "com.kingbase8.Driver";
49+
50+
public KingbaseAllReqRow(RowTypeInfo rowTypeInfo,
51+
JoinInfo joinInfo,
52+
List<FieldInfo> outFieldInfoList,
53+
AbstractSideTableInfo sideTableInfo) {
54+
super(new KingbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
55+
}
56+
57+
@Override
58+
public Connection getConn(String dbUrl, String userName, String password) {
59+
try {
60+
Class.forName(KINGBASE_DRIVER);
61+
//add param useCursorFetch=true
62+
Map<String, String> addParams = Maps.newHashMap();
63+
addParams.put("useCursorFetch", "true");
64+
String targetDbUrl = DtStringUtil.addJdbcParam(dbUrl, addParams, true);
65+
return DriverManager.getConnection(targetDbUrl, userName, password);
66+
} catch (Exception e) {
67+
LOG.error("kingbase get connection error", e);
68+
throw new RuntimeException("kingbase get connect error", e);
69+
}
70+
}
71+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
29+
import java.util.List;
30+
31+
/**
32+
* Date: 2020/9/11
33+
* Company: www.dtstack.com
34+
*
35+
* @author tiezhu
36+
*/
37+
public class KingbaseAllSideInfo extends RdbAllSideInfo {
38+
39+
private static final long serialVersionUID = 3486920874840522682L;
40+
41+
public KingbaseAllSideInfo(RowTypeInfo rowTypeInfo,
42+
JoinInfo joinInfo,
43+
List<FieldInfo> outFieldInfoList,
44+
AbstractSideTableInfo sideTableInfo) {
45+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
46+
}
47+
48+
@Override
49+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
50+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
51+
}
52+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.kingbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.async.kingbase</artifactId>
13+
14+
15+
</project>
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import io.vertx.core.Vertx;
27+
import io.vertx.core.VertxOptions;
28+
import io.vertx.core.json.JsonObject;
29+
import io.vertx.ext.jdbc.JDBCClient;
30+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
31+
import org.apache.flink.configuration.Configuration;
32+
33+
import java.util.List;
34+
35+
/**
36+
* Date: 2020/9/11
37+
* Company: www.dtstack.com
38+
*
39+
* @author tiezhu
40+
*/
41+
public class KingbaseAsyncReqRow extends RdbAsyncReqRow {
42+
43+
private static final long serialVersionUID = -4614565210747028814L;
44+
45+
public KingbaseAsyncReqRow(RowTypeInfo rowTypeInfo,
46+
JoinInfo joinInfo,
47+
List<FieldInfo> outFieldInfoList,
48+
AbstractSideTableInfo sideTableInfo) {
49+
super(new KingbaseAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
50+
}
51+
52+
private final static String KINGBASE_DRIVER = "com.kingbase8.Driver";
53+
54+
@Override
55+
public void open(Configuration parameters) throws Exception {
56+
super.open(parameters);
57+
JsonObject kingbaseClient = new JsonObject();
58+
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
59+
kingbaseClient.put("url", rdbSideTableInfo.getUrl())
60+
.put("driver_class", KINGBASE_DRIVER)
61+
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
62+
.put("user", rdbSideTableInfo.getUserName())
63+
.put("password", rdbSideTableInfo.getPassword())
64+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
65+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
66+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
67+
68+
VertxOptions vo = new VertxOptions();
69+
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
70+
vo.setWorkerPoolSize(rdbSideTableInfo.getAsyncPoolSize());
71+
Vertx vertx = Vertx.vertx(vo);
72+
setRdbSqlClient(JDBCClient.createNonShared(vertx, kingbaseClient));
73+
}
74+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.FieldInfo;
23+
import com.dtstack.flink.sql.side.JoinInfo;
24+
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
27+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
29+
import java.util.List;
30+
31+
/**
32+
* Date: 2020/9/11
33+
* Company: www.dtstack.com
34+
*
35+
* @author tiezhu
36+
*/
37+
public class KingbaseAsyncSideInfo extends RdbAsyncSideInfo {
38+
39+
private static final long serialVersionUID = -1893856733189188893L;
40+
41+
public KingbaseAsyncSideInfo(RowTypeInfo rowTypeInfo,
42+
JoinInfo joinInfo,
43+
List<FieldInfo> outFieldInfoList,
44+
AbstractSideTableInfo sideTableInfo) {
45+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
46+
}
47+
48+
@Override
49+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
50+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
51+
}
52+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.kingbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
<version>1.0-SNAPSHOT</version>
12+
<packaging>jar</packaging>
13+
<artifactId>sql.side.kingbase.core</artifactId>
14+
15+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.kingbase.table;
20+
21+
import com.dtstack.flink.sql.side.rdb.table.RdbSideParser;
22+
import com.dtstack.flink.sql.table.AbstractTableInfo;
23+
24+
import java.util.Map;
25+
26+
/**
27+
* Date: 2020/9/11
28+
* Company: www.dtstack.com
29+
*
30+
* @author tiezhu
31+
*/
32+
public class KingbaseSideParser extends RdbSideParser {
33+
private static final String CURRENT_TYPE = "kingbase";
34+
35+
@Override
36+
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
37+
AbstractTableInfo kingbaseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
38+
kingbaseTableInfo.setType(CURRENT_TYPE);
39+
return kingbaseTableInfo;
40+
}
41+
}

kingbase/kingbase-side/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.kingbase</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.side.kingbase</artifactId>
13+
<packaging>pom</packaging>
14+
<modules>
15+
<module>kingbase-all-side</module>
16+
<module>kingbase-async-side</module>
17+
<module>kingbase-side-core</module>
18+
</modules>
19+
<properties>
20+
<rdb.side.version>1.0-SNAPSHOT</rdb.side.version>
21+
</properties>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>com.dtstack.flink</groupId>
26+
<artifactId>sql.side.rdb</artifactId>
27+
<version>${rdb.side.version}</version>
28+
</dependency>
29+
</dependencies>
30+
31+
</project>

0 commit comments

Comments
 (0)