Skip to content

Commit 3e611ca

Browse files
committed
编辑readme.md
1 parent b5db1ed commit 3e611ca

File tree

8 files changed

+191
-6
lines changed

8 files changed

+191
-6
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ mvn clean package -Dmaven.test.skip
7171

7272
* **confProp**
7373
* 描述:一些参数设置
74+
* 格式: json
7475
* 必选:否
7576
* 默认值:无
7677
* 可选参数:

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
102102
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
103103
String fieldName = String.join(" ", filedNameArr);
104104
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
105-
//Class fieldClass = ClassUtil.stringConvertClass(filedInfoArr[1].trim());
106105
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
107106

108107
tableInfo.addField(fieldName);

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
5252

5353
String[] fieldNames = typeInfo.getFieldNames();
5454
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
55-
String fields = StringUtils.join(fieldNames, ",");
5655

5756
if(Strings.isNullOrEmpty(eventTimeFieldName)){
5857
return dataStream;

docs/colType.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
| 支持的类型 | java对应类型 |
2+
| ------ | ----- |
3+
| boolean | Boolean |
4+
| int | Integer |
5+
| bigint | Long |
6+
| tinyint | Byte |
7+
| byte | Byte |
8+
| short | Short |
9+
| smallint | Short|
10+
| char | String|
11+
| varchar | String |
12+
| string | String|
13+
| float | Float|
14+
| double | Double|
15+
| date | Date |
16+
| timestamp | Timestamp |

docs/hbaseSink.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE MyResult(
4+
colFamily:colName colType,
5+
...
6+
)WITH(
7+
type ='hbase',
8+
zookeeperQuorum ='ip:port[,ip:port]',
9+
tableName ='tableName',
10+
rowKey ='colFamily:colName[,colFamily:colName]',
11+
parallelism ='1',
12+
zookeeperParent ='/hbase'
13+
)
14+
15+
16+
```
17+
18+
## 2.参数:
19+
* tableName ==> 在 sql 中使用的名称;即注册到flink-table-env上的名称
20+
* colFamily:colName ==> hbase中的列族名称和列名称
21+
* colType ==> 列类型 [colType支持的类型](colType.md)
22+
23+
* type ==> 表明 输出表类型[mysql|hbase|elasticsearch]
24+
* zookeeperQuorum ==> hbase zk地址,多个直接用逗号隔开
25+
* zookeeperParent ==> zkParent 路径
26+
* tableName ==> 关联的hbase表名称
27+
* rowKey ==> hbase的rowkey关联的列信息
28+
* parallelism ==> 并行度设置
29+
30+
31+
## 3.样例:
32+
```
33+
CREATE TABLE MyResult(
34+
cf:channel STRING,
35+
cf:pv BIGINT
36+
)WITH(
37+
type ='hbase',
38+
zookeeperQuorum ='rdos1:2181',
39+
tableName ='workerinfo',
40+
rowKey ='cf:channel',
41+
parallelism ='1',
42+
zookeeperParent ='/hbase'
43+
)
44+
45+
```

docs/kafka09source.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,33 @@
33
CREATE TABLE tableName(
44
colName colType,
55
...
6-
function(channel) AS alias
6+
function(colNameX) AS aliasName,
7+
WATERMARK FOR colName AS withOffset( colName , delayTime )
78
)WITH(
89
type ='kafka09',
910
bootstrapServers ='ip:port,ip:port...',
1011
zookeeperQuorum ='ip:port,ip:port/zkparent',
1112
offsetReset ='latest',
12-
topic ='nbTest1',
13-
parallelism ='1'
13+
topic ='topicName',
14+
parallelism ='parllNum'
1415
);
1516
```
1617
## 2.参数:
18+
* tableName ==> 在 sql 中使用的名称;即注册到flink-table-env上的名称
19+
* colName ==> 列名称
20+
* colType ==> 列类型 [colType支持的类型](colType.md)
21+
* function(colNameX) as aliasName ==> 支持在定义列信息的时候根据已有列类型生成新的列(函数可以使用系统函数和已经注册的UDF)
22+
* WATERMARK FOR colName AS withOffset( colName , delayTime ) ==> 标识输入流生的watermake生成规则,根据指定的colName(当前支持列的类型为Long|Timestamp)
23+
和delayTime生成waterMark 同时会在注册表的使用附带上rowtime字段(如果未指定则默认添加proctime字段);
24+
注意:添加该标识的使用必须设置系统参数 time.characteristic:EventTime; delayTime: 数据最大延迟时间(ms)
25+
1726
* type ==> kafka09
18-
* bootstrapServers
27+
* bootstrapServers ==> kafka bootstrap-server 地址信息(多个用逗号隔开)
28+
* zookeeperQuorum ==> kafka zk地址信息(多个之间用逗号分隔)
29+
* topic ==> 需要读取的 topic 名称
30+
* offsetReset ==> 读取的topic 的offset初始位置[latest|earliest]
31+
* parallelism ==> 并行度设置
32+
1933
## 3.样例:
2034
```
2135
CREATE TABLE MyTable(

docs/mysqlSide.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
2+
## 1.格式:
3+
```
4+
CREATE TABLE tableName(
5+
colName cloType,
6+
...
7+
PRIMARY KEY(keyInfo),
8+
PERIOD FOR SYSTEM_TIME
9+
)WITH(
10+
type='mysql',
11+
url='jdbcUrl',
12+
userName='dbUserName',
13+
password='dbPwd',
14+
tableName='tableName',
15+
cache ='LRU',
16+
cacheSize ='10000',
17+
cacheTTLMs ='60000',
18+
parallelism ='1',
19+
partitionedJoin='false'
20+
);
21+
```
22+
23+
## 2.参数
24+
25+
* tableName ==> 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)
26+
* colName ==> 列名称
27+
* colType ==> 列类型 [colType支持的类型](colType.md)
28+
* PERIOD FOR SYSTEM_TIME ==> 关键字表明该定义的表为维表信息
29+
* PRIMARY KEY(keyInfo) ==> 维表主键定义;多个列之间用逗号隔开
30+
* url ==> 连接mysql数据库 jdbcUrl
31+
* userName ==> mysql连接用户名
32+
* password ==> mysql连接密码
33+
* tableName ==> mysql表名称
34+
* type ==> 表明维表的类型[hbase|mysql]
35+
36+
* tableName ==> mysql 的表名称
37+
* cache ==> 维表缓存策略(NONE/LRU)
38+
39+
> * NONE: 不做内存缓存
40+
> * LRU:
41+
> > cacheSize ==> 缓存的条目数量
42+
> > cacheTTLMs ==> 缓存的过期时间(ms)
43+
44+
* partitionedJoin ==> 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)
45+
46+
## 3.样例
47+
```
48+
create table sideTable(
49+
channel String,
50+
xccount int,
51+
PRIMARY KEY(channel),
52+
PERIOD FOR SYSTEM_TIME
53+
)WITH(
54+
type='mysql',
55+
url='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
56+
userName='dtstack',
57+
password='abc123',
58+
tableName='sidetest',
59+
cache ='LRU',
60+
cacheSize ='10000',
61+
cacheTTLMs ='60000',
62+
parallelism ='1',
63+
partitionedJoin='false'
64+
);
65+
66+
67+
```
68+
69+

docs/mysqlSink.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
...
6+
colNameX colType
7+
)WITH(
8+
type ='mysql',
9+
url ='jdbcUrl',
10+
userName ='userName',
11+
password ='pwd',
12+
tableName ='tableName',
13+
parallelism ='parllNum'
14+
);
15+
16+
```
17+
18+
## 2.参数:
19+
* tableName ==> 在 sql 中使用的名称;即注册到flink-table-env上的名称
20+
* colName ==> 列名称
21+
* colType ==> 列类型 [colType支持的类型](colType.md)
22+
* type ==> 表明 输出表类型[mysql|hbase|elasticsearch]
23+
* url ==> 连接mysql数据库 jdbcUrl
24+
* userName ==> mysql连接用户名
25+
* password ==> mysql连接密码
26+
* tableName ==> mysql表名称
27+
* parallelism ==> 并行度设置
28+
29+
## 3.样例:
30+
```
31+
CREATE TABLE MyResult(
32+
channel VARCHAR,
33+
pv VARCHAR
34+
)WITH(
35+
type ='mysql',
36+
url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
37+
userName ='dtstack',
38+
password ='abc123',
39+
tableName ='pv2',
40+
parallelism ='1'
41+
)
42+
```

0 commit comments

Comments
 (0)