Skip to content

Commit c5c7460

Browse files
committed
Merge branch '1.8_release_3.10.x' into feat_1.8_3.10.x_mergedDev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfoParser.java # redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java # redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java
2 parents 0ef7954 + 6159bb9 commit c5c7460

File tree

12 files changed

+203
-170
lines changed

12 files changed

+203
-170
lines changed

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ public void open(Configuration parameters) throws Exception {
5151
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
5252
.put("user", rdbSideTableInfo.getUserName())
5353
.put("password", rdbSideTableInfo.getPassword())
54-
.put("provider_class", DT_PROVIDER_CLASS);
54+
.put("provider_class", DT_PROVIDER_CLASS)
55+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
56+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
57+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
58+
5559
System.setProperty("vertx.disableFileCPResolving", "true");
5660
VertxOptions vo = new VertxOptions();
5761
vo.setEventLoopPoolSize(rdbSideTableInfo.getAsyncPoolSize());

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfoParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public AbstractTableInfo parseWithTableType(int tableType, CreateTableParser.Sql
8181
if(absTableParser == null){
8282
String cacheType = MathUtil.getString(props.get(AbstractSideTableInfo.CACHE_KEY));
8383
absTableParser = StreamSideFactory.getSqlParser(type, localPluginRoot, cacheType);
84-
sideTableInfoMap.put(type, absTableParser);
84+
sideTableInfoMap.put(type + cacheType, absTableParser);
8585
}
8686
}
8787

docs/redisSide.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
CREATE TABLE tableName(
55
colName cloType,
66
...
7+
PRIMARY KEY(colName1,colName2) ,
78
PERIOD FOR SYSTEM_TIME
89
)WITH(
910
type ='redis',
@@ -27,9 +28,10 @@
2728
| tableName | 注册到flink的表名称(可选填;不填默认和redis对应的表名称相同)|
2829
| colName | 列名称,维表列名格式 表名:主键名:主键值:列名]|
2930
| colType | 列类型,当前只支持varchar|
31+
| PRIMARY KEY |主键,多个字段做为联合主键时以逗号分隔
3032
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
3133

32-
## 3.参数
34+
## 4.参数
3335

3436
|参数名称|含义|是否必填|默认值|
3537
|----|---|---|----|
@@ -51,7 +53,7 @@
5153
* cacheTTLMs:缓存的过期时间(ms)
5254
* ALL: 缓存全量表数据
5355

54-
## 4.样例
56+
## 5.样例
5557
```
5658
create table sideTable(
5759
channel varchar,
@@ -70,5 +72,10 @@ create table sideTable(
7072
);
7173
7274
```
75+
## 6.缓存redis的存储结构规则
76+
```
77+
redis使用散列类型 hash 数据结构,key=tableName_primaryKey1_primaryKey2,value={column1=value1, column2=value2}
78+
如果以班级class表为例,id和name作为联合主键,那么redis的结构为 <class_1_john ,{id=1, name=john, age=12}>
79+
```
7380

7481

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ public void open(Configuration parameters) throws Exception {
6666
.put("provider_class", DT_PROVIDER_CLASS)
6767
.put("idle_connection_test_period", 300)
6868
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
69-
.put("max_idle_time", 600);
69+
.put("max_idle_time", 600)
70+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
71+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
72+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
7073

7174
System.setProperty("vertx.disableFileCPResolving", "true");
7275

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllSideInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ public class MysqlAllSideInfo extends RdbAllSideInfo {
3838
public MysqlAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3939
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4040
}
41+
42+
@Override
43+
public String quoteIdentifier(String identifier) {
44+
return "`" + identifier + "`";
45+
}
4146
}

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncSideInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ public class MysqlAsyncSideInfo extends RdbAsyncSideInfo {
3939
public MysqlAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4040
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4141
}
42+
43+
@Override
44+
public String quoteIdentifier(String identifier) {
45+
return "`" + identifier + "`";
46+
}
4247
}

polardb/polardb-side/polardb-all-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAllSideInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,10 @@ public class PolardbAllSideInfo extends RdbAllSideInfo {
3434
public PolardbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3535
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3636
}
37+
38+
@Override
39+
public String quoteIdentifier(String identifier) {
40+
return "`" + identifier + "`";
41+
}
3742
}
3843

polardb/polardb-side/polardb-async-side/src/main/java/com/dtstack/flink/sql/side/polardb/PolardbAsyncSideInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,9 @@ public class PolardbAsyncSideInfo extends RdbAsyncSideInfo {
3535
public PolardbAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3636
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3737
}
38+
39+
@Override
40+
public String quoteIdentifier(String identifier) {
41+
return "`" + identifier + "`";
42+
}
3843
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ public void open(Configuration parameters) throws Exception {
6262
.put("driver_class", POSTGRESQL_DRIVER)
6363
.put("max_pool_size", rdbSideTableInfo.getAsyncPoolSize())
6464
.put("user", rdbSideTableInfo.getUserName())
65-
.put("password", rdbSideTableInfo.getPassword());
65+
.put("password", rdbSideTableInfo.getPassword())
66+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
67+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
68+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
6669

6770
VertxOptions vo = new VertxOptions();
6871
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);

0 commit comments

Comments
 (0)