Skip to content

Commit b10e48b

Browse files
修竹修竹
authored andcommitted
开发impala维表
1 parent 4b2ec5d commit b10e48b

File tree

2 files changed

+114
-2
lines changed

2 files changed

+114
-2
lines changed
Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.side.impala;
220

3-
public class ImpalaAsyncReqRow {
21+
import com.dtstack.flink.sql.side.FieldInfo;
22+
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.SideInfo;
24+
import com.dtstack.flink.sql.side.SideTableInfo;
25+
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
26+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
27+
import io.vertx.core.Vertx;
28+
import io.vertx.core.VertxOptions;
29+
import io.vertx.core.json.JsonObject;
30+
import io.vertx.ext.jdbc.JDBCClient;
31+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
32+
import org.apache.flink.configuration.Configuration;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.util.List;
37+
38+
/**
39+
* Date: 2019/11/12
40+
* Company: www.dtstack.com
41+
*
42+
* @author xiuzhu
43+
*/
44+
45+
public class ImpalaAsyncReqRow extends RdbAsyncReqRow {
46+
47+
private static final Logger LOG = LoggerFactory.getLogger(ImpalaAsyncReqRow.class);
48+
49+
private final static String IMPALA_DRIVER = "com.cloudera.impala.jdbc41.Driver";
50+
51+
52+
public ImpalaAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
53+
super(new ImpalaAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
54+
}
55+
56+
@Override
57+
public void open(Configuration parameters) throws Exception {
58+
super.open(parameters);
59+
JsonObject impalaClientConfig = new JsonObject();
60+
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
61+
impalaClientConfig.put("url", rdbSideTableInfo.getUrl())
62+
.put("driver_class", IMPALA_DRIVER)
63+
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
64+
.put("user", rdbSideTableInfo.getUserName())
65+
.put("password", rdbSideTableInfo.getPassword())
66+
.put("provider_class", DT_PROVIDER_CLASS)
67+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
68+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
69+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
70+
71+
System.setProperty("vertx.disableFileCPResolving", "true");
72+
73+
VertxOptions vo = new VertxOptions();
74+
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
75+
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
76+
vo.setFileResolverCachingEnabled(false);
77+
Vertx vertx = Vertx.vertx(vo);
78+
setRdbSQLClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
79+
}
480
}
Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.side.impala;
220

3-
public class ImpalaAsyncSideInfo {
21+
import com.dtstack.flink.sql.side.FieldInfo;
22+
import com.dtstack.flink.sql.side.JoinInfo;
23+
import com.dtstack.flink.sql.side.SideTableInfo;
24+
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
26+
27+
import java.util.List;
28+
29+
/**
30+
* Date: 2019/11/12
31+
* Company: www.dtstack.com
32+
*
33+
* @author xiuzhu
34+
*/
35+
36+
public class ImpalaAsyncSideInfo extends RdbAsyncSideInfo {
37+
public ImpalaAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
38+
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
39+
}
440
}

0 commit comments

Comments
 (0)