Skip to content

Commit b4c17fc

Browse files
author
sishu@dtstack.com
committed
Merge branch 'v1.4.0'
merge
2 parents 1c12be3 + d2ccc4e commit b4c17fc

File tree

11 files changed

+343
-57
lines changed

11 files changed

+343
-57
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ mvn clean package -Dmaven.test.skip
7878

7979
* **confProp**
8080
* 描述:一些参数设置
81+
* 格式: json
8182
* 必选:否
8283
* 默认值:无
8384
* 可选参数:

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
102102
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
103103
String fieldName = String.join(" ", filedNameArr);
104104
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
105-
//Class fieldClass = ClassUtil.stringConvertClass(filedInfoArr[1].trim());
106105
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
107106

108107
tableInfo.addField(fieldName);

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
5252

5353
String[] fieldNames = typeInfo.getFieldNames();
5454
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
55-
String fields = StringUtils.join(fieldNames, ",");
5655

5756
if(Strings.isNullOrEmpty(eventTimeFieldName)){
5857
return dataStream;

docs/colType.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
| 支持的类型 | java对应类型 |
2+
| ------ | ----- |
3+
| boolean | Boolean |
4+
| int | Integer |
5+
| bigint | Long |
6+
| tinyint | Byte |
7+
| byte | Byte |
8+
| short | Short |
9+
| smallint | Short|
10+
| char | String|
11+
| varchar | String |
12+
| string | String|
13+
| float | Float|
14+
| double | Double|
15+
| date | Date |
16+
| timestamp | Timestamp |

docs/elasticsearchSink.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
bb INT
6+
)WITH(
7+
type ='elasticsearch',
8+
address ='ip:port[,ip:port]',
9+
cluster='clusterName',
10+
estype ='esType',
11+
index ='index',
12+
id ='num[,num]',
13+
parallelism ='1'
14+
)
15+
```
16+
## 2.支持的版本
17+
ES5
18+
19+
## 3.表结构定义
20+
21+
|参数名称|含义|
22+
|----|---|
23+
|tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称|
24+
|colName|列名称|
25+
|colType|列类型 [colType支持的类型](colType.md)|
26+
27+
## 4.参数:
28+
|参数名称|含义|是否必填|默认值|
29+
|----|---|---|----|
30+
|type|表明 输出表类型[mysql\|hbase\|elasticsearch]|||
31+
|address | 连接ES Transport地址(tcp地址)|||
32+
|cluster | ES 集群名称 |||
33+
|index | 选择的ES上的index名称|||
34+
|estype | 选择ES上的type名称|||
35+
|id | 生成id的规则(当前是根据指定的字段pos获取字段信息,拼接生成id)|||
36+
|parallelism | 并行度设置||1|
37+
38+
## 5.样例:
39+
```
40+
CREATE TABLE MyResult(
41+
aa INT,
42+
bb INT
43+
)WITH(
44+
type ='elasticsearch',
45+
address ='172.16.10.47:9500',
46+
cluster='es_47_menghan',
47+
estype ='type1',
48+
index ='xc_es_test',
49+
id ='0,1',
50+
parallelism ='1'
51+
)
52+
```

docs/hbaseSide.md

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,38 @@
1818
partitionedJoin='false'
1919
);
2020
```
21-
22-
## 2.参数
23-
24-
* tableName ==> 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)
25-
* columnFamily:columnName ==> hbase中的列族名称和列名称
26-
* alias ===> hbase 中的列对应到flink中注册的列名称
27-
* PERIOD FOR SYSTEM_TIME ==> 关键字表明该定义的表为维表信息
28-
* PRIMARY KEY(keyInfo) ==> 维表主键定义;hbase 维表为rowkey的构造方式;
29-
可选择的构造包括 md5(alias + alias), '常量',也包括上述方式的自由组合
21+
## 2.支持版本
22+
hbase2.0
3023

31-
* type ==> 表明维表的类型[hbase|mysql]
32-
* zookeeperQuorum ==> hbase 的zk地址;格式ip:port[;ip:port]
33-
* zookeeperParent ==> hbase 的zk parent路径
34-
* tableName ==> hbase 的表名称
35-
* cache ==> 维表缓存策略(NONE/LRU)
24+
## 3.表结构定义
25+
26+
|参数名称|含义|
27+
|----|---|
28+
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
29+
| columnFamily:columnName | hbase中的列族名称和列名称 |
30+
| alias | hbase 中的列对应到flink中注册的列名称 |
31+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
32+
| PRIMARY KEY(keyInfo) | 维表主键定义;hbase 维表rowkey的构造方式;可选择的构造包括 md5(alias + alias), '常量',也包括上述方式的自由组合 |
33+
34+
## 3.参数
3635

37-
> * NONE: 不做内存缓存
38-
> * LRU:
39-
> > cacheSize ==> 缓存的条目数量
40-
> > cacheTTLMs ==> 缓存的过期时间(ms)
36+
|参数名称|含义|是否必填|默认值|
37+
|----|---|---|----|
38+
| type | 表明维表的类型[hbase\|mysql]|||
39+
| zookeeperQuorum | hbase 的zk地址;格式ip:port[;ip:port]|||
40+
| zookeeperParent | hbase 的zk parent路径|||
41+
| tableName | hbase 的表名称|||
42+
| cache | 维表缓存策略(NONE/LRU)||NONE|
43+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
4144

42-
* partitionedJoin ==> 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)
45+
--------------
46+
> 缓存策略
47+
* NONE: 不做内存缓存
48+
* LRU:
49+
* cacheSize: 缓存的条目数量
50+
* cacheTTLMs:缓存的过期时间(ms)
4351

44-
## 3.样例
52+
## 4.样例
4553
```
4654
CREATE TABLE sideTable(
4755
cf:name String as name,

docs/hbaseSink.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE MyResult(
4+
colFamily:colName colType,
5+
...
6+
)WITH(
7+
type ='hbase',
8+
zookeeperQuorum ='ip:port[,ip:port]',
9+
tableName ='tableName',
10+
rowKey ='colFamily:colName[,colFamily:colName]',
11+
parallelism ='1',
12+
zookeeperParent ='/hbase'
13+
)
14+
15+
16+
```
17+
18+
## 2.支持版本
19+
hbase2.0
20+
21+
## 3.表结构定义
22+
23+
|参数名称|含义|
24+
|----|---|
25+
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称
26+
| colFamily:colName | hbase中的列族名称和列名称
27+
| colType | 列类型 [colType支持的类型](colType.md)
28+
29+
## 4.参数:
30+
31+
|参数名称|含义|是否必填|默认值|
32+
|----|---|---|-----|
33+
|type | 表明 输出表类型[mysql\|hbase\|elasticsearch]|||
34+
|zookeeperQuorum | hbase zk地址,多个直接用逗号隔开|||
35+
|zookeeperParent | zkParent 路径|||
36+
|tableName | 关联的hbase表名称|||
37+
|rowKey | hbase的rowkey关联的列信息|||
38+
|parallelism | 并行度设置||1|
39+
40+
41+
## 5.样例:
42+
```
43+
CREATE TABLE MyResult(
44+
cf:channel STRING,
45+
cf:pv BIGINT
46+
)WITH(
47+
type ='hbase',
48+
zookeeperQuorum ='rdos1:2181',
49+
tableName ='workerinfo',
50+
rowKey ='cf:channel',
51+
parallelism ='1',
52+
zookeeperParent ='/hbase'
53+
)
54+
55+
```

docs/kafka09Source.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
...
6+
function(colNameX) AS aliasName,
7+
WATERMARK FOR colName AS withOffset( colName , delayTime )
8+
)WITH(
9+
type ='kafka09',
10+
bootstrapServers ='ip:port,ip:port...',
11+
zookeeperQuorum ='ip:port,ip:port/zkparent',
12+
offsetReset ='latest',
13+
topic ='topicName',
14+
parallelism ='parllNum'
15+
);
16+
```
17+
18+
## 2.支持的版本
19+
kafka09
20+
21+
## 3.表结构定义
22+
23+
|参数名称|含义|
24+
|----|---|
25+
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称|
26+
| colName | 列名称|
27+
| colType | 列类型 [colType支持的类型](colType.md)|
28+
| function(colNameX) as aliasName | 支持在定义列信息的时候根据已有列类型生成新的列(函数可以使用系统函数和已经注册的UDF)|
29+
| WATERMARK FOR colName AS withOffset( colName , delayTime ) | 标识输入流生的watermake生成规则,根据指定的colName(当前支持列的类型为Long \| Timestamp) 和delayTime生成waterMark 同时会在注册表的使用附带上rowtime字段(如果未指定则默认添加proctime字段);注意:添加该标识的使用必须设置系统参数 time.characteristic:EventTime; delayTime: 数据最大延迟时间(ms)|
30+
31+
## 4.参数:
32+
33+
|参数名称|含义|是否必填|默认值|
34+
|----|---|---|---|
35+
|type | kafka09 |||
36+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
37+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
38+
|topic | 需要读取的 topic 名称|||
39+
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest]||latest|
40+
|parallelism | 并行度设置||1|
41+
42+
## 5.样例:
43+
```
44+
CREATE TABLE MyTable(
45+
name string,
46+
channel STRING,
47+
pv INT,
48+
xctime bigint,
49+
CHARACTER_LENGTH(channel) AS timeLeng
50+
)WITH(
51+
type ='kafka09',
52+
bootstrapServers ='172.16.8.198:9092',
53+
zookeeperQuorum ='172.16.8.198:2181/kafka',
54+
offsetReset ='latest',
55+
topic ='nbTest1',
56+
parallelism ='1'
57+
);
58+
```

docs/kafka09source.md

Lines changed: 0 additions & 35 deletions
This file was deleted.

docs/mysqlSide.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
2+
## 1.格式:
3+
```
4+
CREATE TABLE tableName(
5+
colName cloType,
6+
...
7+
PRIMARY KEY(keyInfo),
8+
PERIOD FOR SYSTEM_TIME
9+
)WITH(
10+
type='mysql',
11+
url='jdbcUrl',
12+
userName='dbUserName',
13+
password='dbPwd',
14+
tableName='tableName',
15+
cache ='LRU',
16+
cacheSize ='10000',
17+
cacheTTLMs ='60000',
18+
parallelism ='1',
19+
partitionedJoin='false'
20+
);
21+
```
22+
23+
# 2.支持版本
24+
mysql-5.6.35
25+
26+
## 3.表结构定义
27+
28+
|参数名称|含义|
29+
|----|---|
30+
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
31+
| colName | 列名称|
32+
| colType | 列类型 [colType支持的类型](colType.md)|
33+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
34+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
35+
36+
## 4.参数
37+
38+
|参数名称|含义|是否必填|默认值|
39+
|----|---|---|----|
40+
| type | 表明维表的类型[hbase\|mysql] |||
41+
| url | 连接mysql数据库 jdbcUrl |||
42+
| userName | mysql连接用户名 |||
43+
| password | mysql连接密码|||
44+
| tableName | mysql表名称|||
45+
| tableName | mysql 的表名称|||
46+
| cache | 维表缓存策略(NONE/LRU)||NONE|
47+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
48+
49+
----------
50+
> 缓存策略
51+
* NONE: 不做内存缓存
52+
* LRU:
53+
* cacheSize: 缓存的条目数量
54+
* cacheTTLMs:缓存的过期时间(ms)
55+
56+
57+
## 5.样例
58+
```
59+
create table sideTable(
60+
channel String,
61+
xccount int,
62+
PRIMARY KEY(channel),
63+
PERIOD FOR SYSTEM_TIME
64+
)WITH(
65+
type='mysql',
66+
url='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
67+
userName='dtstack',
68+
password='abc123',
69+
tableName='sidetest',
70+
cache ='LRU',
71+
cacheSize ='10000',
72+
cacheTTLMs ='60000',
73+
parallelism ='1',
74+
partitionedJoin='false'
75+
);
76+
77+
78+
```
79+
80+

0 commit comments

Comments
 (0)