Skip to content

Commit d44b9b9

Browse files
committed
Merge branch 'v1.8.0_dev_feature_kudu' into 'v1.8.0_dev'
修改kudu文档说明,去掉代码中无用注释 修改kudu文档说明,去掉代码中无用注释 See merge request !127
2 parents 21edafc + 9e71fa4 commit d44b9b9

File tree

7 files changed

+10
-61
lines changed

7 files changed

+10
-61
lines changed

docs/kuduSide.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,15 @@ kudu 1.9.0+cdh6.2.0
6464

6565
|参数名称|含义|是否必填|默认值|
6666
|----|---|---|-----|
67-
|type | 表明维表的类型[hbase|mysql|kudu]|||
67+
| type | 表明维表的类型[hbase|mysql|kudu]|||
6868
| kuduMasters | kudu master节点的地址;格式ip[ip,ip2]|||
6969
| tableName | kudu 的表名称|||
7070
| workerCount | 工作线程数 |||
71-
| defaultOperationTimeoutMs | 写入操作超时时间 |||
71+
| defaultOperationTimeoutMs | scan操作超时时间 |||
7272
| defaultSocketReadTimeoutMs | socket读取超时时间 |||
7373
| primaryKey | 需要过滤的主键 ALL模式独有 |||
7474
| lowerBoundPrimaryKey | 需要过滤的主键的最小值 ALL模式独有 |||
7575
| upperBoundPrimaryKey | 需要过滤的主键的最大值(不包含) ALL模式独有 |||
76-
| workerCount | 工作线程数 |||
77-
| defaultOperationTimeoutMs | 写入操作超时时间 |||
78-
| defaultSocketReadTimeoutMs | socket读取超时时间 |||
7976
| batchSizeBytes |返回数据的大小 |||
8077
| limitNum |返回数据的条数 |||
8178
| isFaultTolerant |查询是否容错 查询失败是否扫描第二个副本 默认false 容错 |||

docs/kuduSink.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ kudu 1.9.0+cdh6.2.0
3434

3535
|参数名称|含义|是否必填|默认值|
3636
|----|---|---|-----|
37-
|type | 表名 输出表类型[mysq|hbase|elasticsearch|redis|kudu]|||
37+
| type | 表名 输出表类型[mysq|hbase|elasticsearch|redis|kudu]|||
3838
| kuduMasters | kudu master节点的地址;格式ip[ip,ip2]|||
3939
| tableName | kudu 的表名称|||
4040
| writeMode | 写入kudu的模式 insert|update|upsert |否 |upsert
4141
| workerCount | 工作线程数 ||
42-
| defaultOperationTimeoutMs | 写入操作超时时间 ||
42+
| defaultOperationTimeoutMs | 操作超时时间 ||
4343
| defaultSocketReadTimeoutMs | socket读取超时时间 ||
44-
|parallelism | 并行度设置||1|
44+
| parallelism | 并行度设置||1|
4545
4646

4747
## 5.样例:

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected void reloadCache() {
9393
loadData(newCache);
9494

9595
cacheRef.set(newCache);
96-
LOG.info("----- Mongo all cacheRef reload end:{}", Calendar.getInstance());
96+
LOG.info("----- kudu all cacheRef reload end:{}", Calendar.getInstance());
9797
}
9898

9999

@@ -177,15 +177,6 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) {
177177
LOG.error("Error while closing scanner.", e);
178178
}
179179
}
180-
//放置到close中关闭 每次刷新时间较长则可以选择在这里关闭
181-
// if (null != client) {
182-
// try {
183-
// client.close();
184-
// } catch (Exception e) {
185-
// LOG.error("Error while closing client.", e);
186-
// }
187-
// }
188-
189180
}
190181

191182

kudu/kudu-side/kudu-side-core/src/main/java/com/dtstack/flink/sql/side/kudu/table/KuduSideTableInfo.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,13 @@ public void setPrimaryKey(String primaryKey) {
142142

143143
@Override
144144
public boolean check() {
145-
Preconditions.checkNotNull(kuduMasters, "Cassandra field of kuduMasters is required");
146-
Preconditions.checkNotNull(tableName, "Cassandra field of tableName is required");
145+
Preconditions.checkNotNull(kuduMasters, "kudu field of kuduMasters is required");
146+
Preconditions.checkNotNull(tableName, "kudu field of tableName is required");
147147
return true;
148148
}
149149

150150
@Override
151151
public String getType() {
152-
// return super.getType().toLowerCase() + TARGET_SUFFIX;
153152
return super.getType().toLowerCase();
154153
}
155154
}

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ public class KuduOutputFormat extends MetricOutputFormat {
3434

3535
public enum WriteMode {INSERT, UPDATE, UPSERT}
3636

37-
// public enum Consistency {EVENTUAL, STRONG}
38-
3937
private String kuduMasters;
4038

4139
private String tableName;
@@ -46,15 +44,10 @@ public enum WriteMode {INSERT, UPDATE, UPSERT}
4644

4745
TypeInformation<?>[] fieldTypes;
4846

49-
// protected List<String> primaryKeys;
50-
51-
// private Consistency consistency = Consistency.STRONG;
52-
5347
private AsyncKuduClient client;
5448

5549
private KuduTable table;
5650

57-
5851
private Integer workerCount;
5952

6053
private Integer defaultOperationTimeoutMs;
@@ -158,11 +151,6 @@ public KuduOutputFormatBuilder setFieldTypes(TypeInformation<?>[] fieldTypes) {
158151
kuduOutputFormat.fieldTypes = fieldTypes;
159152
return this;
160153
}
161-
//
162-
// public KuduOutputFormatBuilder setPrimaryKeys(List<String> primaryKeys) {
163-
// kuduOutputFormat.primaryKeys = primaryKeys;
164-
// return this;
165-
// }
166154

167155
public KuduOutputFormatBuilder setWriteMode(WriteMode writeMode) {
168156
if (null == writeMode) {
@@ -177,21 +165,6 @@ public KuduOutputFormatBuilder setWorkerCount(Integer workerCount) {
177165
return this;
178166
}
179167

180-
// public KuduOutputFormatBuilder setConsistency(String consistency) {
181-
// switch (consistency) {
182-
// case "EVENTUAL":
183-
// kuduOutputFormat.consistency = Consistency.EVENTUAL;
184-
// break;
185-
// case "STRONG":
186-
// kuduOutputFormat.consistency = Consistency.STRONG;
187-
// break;
188-
// default:
189-
// kuduOutputFormat.consistency = Consistency.STRONG;
190-
// }
191-
// return this;
192-
// }
193-
194-
195168
public KuduOutputFormatBuilder setDefaultOperationTimeoutMs(Integer defaultOperationTimeoutMs) {
196169
kuduOutputFormat.defaultOperationTimeoutMs = defaultOperationTimeoutMs;
197170
return this;

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ public class KuduSink implements RetractStreamTableSink<Row>, Serializable, IStr
2929

3030
TypeInformation<?>[] fieldTypes;
3131

32-
// protected List<String> primaryKeys;
33-
34-
// private KuduOutputFormat.Consistency consistency = KuduOutputFormat.Consistency.STRONG;
35-
36-
3732
private Integer workerCount;
3833

3934
private Integer defaultOperationTimeoutMs;
@@ -78,7 +73,6 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
7873
return this;
7974
}
8075

81-
8276
@Override
8377
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
8478
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType());

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduTableInfo.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ public class KuduTableInfo extends TargetTableInfo {
1414

1515
private KuduOutputFormat.WriteMode writeMode;
1616

17-
18-
// private KuduOutputFormat.Consistency consistency = KuduOutputFormat.Consistency.STRONG;
19-
20-
2117
private Integer workerCount;
2218

2319
private Integer defaultOperationTimeoutMs;
@@ -79,14 +75,13 @@ public void setDefaultSocketReadTimeoutMs(Integer defaultSocketReadTimeoutMs) {
7975

8076
@Override
8177
public boolean check() {
82-
Preconditions.checkNotNull(kuduMasters, "Cassandra field of kuduMasters is required");
83-
Preconditions.checkNotNull(tableName, "Cassandra field of tableName is required");
78+
Preconditions.checkNotNull(kuduMasters, "kudu field of kuduMasters is required");
79+
Preconditions.checkNotNull(tableName, "kudu field of tableName is required");
8480
return true;
8581
}
8682

8783
@Override
8884
public String getType() {
89-
// return super.getType().toLowerCase() + TARGET_SUFFIX;
9085
return super.getType().toLowerCase();
9186
}
9287
}

0 commit comments

Comments
 (0)