Skip to content

Commit 1ccefd1

Browse files
committed
Merge branch 'revert-8fc47a79' into 'v1.8.0_dev'
Revert "Merge branch '1.8_dev_hbase_sink' into 'v1.8.0_dev'" This reverts merge request !154 See merge request !156
2 parents 1813d74 + 29985c2 commit 1ccefd1

File tree

4 files changed

+17
-70
lines changed

4 files changed

+17
-70
lines changed

docs/hbaseSink.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE TABLE MyResult(
77
type ='hbase',
88
zookeeperQuorum ='ip:port[,ip:port]',
99
tableName ='tableName',
10-
rowKey ='colName[,colName]',
10+
rowKey ='colFamily:colName[,colFamily:colName]',
1111
parallelism ='1',
1212
zookeeperParent ='/hbase'
1313
)
@@ -40,16 +40,16 @@ hbase2.0
4040

4141
## 5.样例:
4242
```
43-
44-
CREATE TABLE MyResult(
45-
cf:info VARCHAR,
46-
cf:name VARCHAR,
47-
cf:channel varchar
43+
CREATE TABLE MyResult(
44+
cf:channel varchar,
45+
cf:pv BIGINT
4846
)WITH(
4947
type ='hbase',
50-
zookeeperQuorum ='xx:2181',
51-
zookeeperParent ='/hbase',
52-
tableName ='workerinfo01',
53-
rowKey ='channel'
54-
);
48+
zookeeperQuorum ='rdos1:2181',
49+
tableName ='workerinfo',
50+
rowKey ='cf:channel',
51+
parallelism ='1',
52+
zookeeperParent ='/hbase'
53+
)
54+
5555
```

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@
3838
import java.text.SimpleDateFormat;
3939
import java.util.ArrayList;
4040
import java.util.List;
41-
import java.util.Map;
42-
import java.util.Set;
4341

4442
/**
4543
* author: jingzhen@dtstack.com
@@ -55,7 +53,6 @@ public class HbaseOutputFormat extends MetricOutputFormat {
5553
private String tableName;
5654
private String[] columnNames;
5755
private String[] columnTypes;
58-
private Map<String,String> columnNameFamily;
5956

6057
private String[] families;
6158
private String[] qualifiers;
@@ -199,11 +196,6 @@ public HbaseOutputFormatBuilder setColumnTypes(String[] columnTypes) {
199196
return this;
200197
}
201198

202-
public HbaseOutputFormatBuilder setColumnNameFamily(Map<String, String> columnNameFamily) {
203-
format.columnNameFamily = columnNameFamily;
204-
return this;
205-
}
206-
207199
public HbaseOutputFormat finish() {
208200
Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified");
209201
Preconditions.checkNotNull(format.tableName, "tableName should be specified");
@@ -213,16 +205,13 @@ public HbaseOutputFormat finish() {
213205
String[] families = new String[format.columnNames.length];
214206
String[] qualifiers = new String[format.columnNames.length];
215207

216-
if (format.columnNameFamily != null) {
217-
Set<String> keySet = format.columnNameFamily.keySet();
218-
String[] columns = keySet.toArray(new String[keySet.size()]);
219-
for (int i = 0; i < columns.length; ++i) {
220-
String col = columns[i];
221-
String[] part = col.split(":");
222-
families[i] = part[0];
223-
qualifiers[i] = part[1];
224-
}
208+
for(int i = 0; i < format.columnNames.length; ++i) {
209+
String col = format.columnNames[i];
210+
String[] part = col.split(":");
211+
families[i] = part[0];
212+
qualifiers[i] = part[1];
225213
}
214+
226215
format.families = families;
227216
format.qualifiers = qualifiers;
228217

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.apache.flink.table.sinks.TableSink;
3636
import org.apache.flink.types.Row;
3737

38-
import java.util.Map;
39-
4038
/**
4139
* Date: 2018/09/14
4240
* Company: www.dtstack.com
@@ -45,7 +43,6 @@
4543
public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<HbaseSink> {
4644

4745
protected String[] fieldNames;
48-
protected Map<String, String> columnNameFamily;
4946
TypeInformation<?>[] fieldTypes;
5047
protected String zookeeperQuorum;
5148
protected String port;
@@ -65,7 +62,6 @@ public HbaseSink genStreamSink(TargetTableInfo targetTableInfo) {
6562
this.parent = hbaseTableInfo.getParent();
6663
this.tableName = hbaseTableInfo.getTableName();
6764
this.rowkey = hbaseTableInfo.getRowkey();
68-
this.columnNameFamily = hbaseTableInfo.getColumnNameFamily();
6965
return this;
7066
}
7167

@@ -76,7 +72,6 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
7672

7773
builder.setRowkey(rowkey);
7874
builder.setColumnNames(fieldNames);
79-
builder.setColumnNameFamily(columnNameFamily);
8075

8176
HbaseOutputFormat outputFormat = builder.finish();
8277
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@
2323

2424
import com.dtstack.flink.sql.table.AbsTableParser;
2525
import com.dtstack.flink.sql.table.TableInfo;
26-
import com.dtstack.flink.sql.util.DtStringUtil;
2726
import com.dtstack.flink.sql.util.MathUtil;
2827

29-
import java.util.LinkedHashMap;
30-
import java.util.List;
3128
import java.util.Map;
3229

3330
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
@@ -68,38 +65,4 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6865
hbaseTableInfo.setRowkey(rk.split(","));
6966
return hbaseTableInfo;
7067
}
71-
72-
public void parseFieldsInfo(String fieldsInfo, HbaseTableInfo tableInfo){
73-
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
74-
Map<String, String> columnFamilies = new LinkedHashMap<>();
75-
for(String fieldRow : fieldRows){
76-
fieldRow = fieldRow.trim();
77-
78-
String[] filedInfoArr = fieldRow.split("\\s+");
79-
if(filedInfoArr.length < 2 ){
80-
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
81-
}
82-
83-
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
84-
if(isMatcherKey){
85-
continue;
86-
}
87-
88-
//Compatible situation may arise in space in the fieldName
89-
String[] filedNameArr = new String[filedInfoArr.length - 1];
90-
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
91-
String fieldName = String.join(" ", filedNameArr);
92-
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
93-
Class fieldClass = dbTypeConvertToJavaType(fieldType);
94-
String[] columnFamily = fieldName.trim().split(":");
95-
columnFamilies.put(fieldName.trim(),columnFamily[1]);
96-
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
97-
tableInfo.addField(columnFamily[1]);
98-
tableInfo.addFieldClass(fieldClass);
99-
tableInfo.addFieldType(fieldType);
100-
tableInfo.addFieldExtraInfo(null);
101-
}
102-
tableInfo.setColumnNameFamily(columnFamilies);
103-
tableInfo.finish();
104-
}
10568
}

0 commit comments

Comments
 (0)