Skip to content

Commit 5af9b17

Browse files
committed
fix hbase sink
1 parent 1ccefd1 commit 5af9b17

File tree

4 files changed

+70
-17
lines changed

4 files changed

+70
-17
lines changed

docs/hbaseSink.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE TABLE MyResult(
77
type ='hbase',
88
zookeeperQuorum ='ip:port[,ip:port]',
99
tableName ='tableName',
10-
rowKey ='colFamily:colName[,colFamily:colName]',
10+
rowKey ='colName[,colName]',
1111
parallelism ='1',
1212
zookeeperParent ='/hbase'
1313
)
@@ -40,16 +40,16 @@ hbase2.0
4040

4141
## 5.样例:
4242
```
43-
CREATE TABLE MyResult(
44-
cf:channel varchar,
45-
cf:pv BIGINT
43+
44+
CREATE TABLE MyResult(
45+
cf:info VARCHAR,
46+
cf:name VARCHAR,
47+
cf:channel varchar
4648
)WITH(
4749
type ='hbase',
48-
zookeeperQuorum ='rdos1:2181',
49-
tableName ='workerinfo',
50-
rowKey ='cf:channel',
51-
parallelism ='1',
52-
zookeeperParent ='/hbase'
53-
)
54-
50+
zookeeperQuorum ='xx:2181',
51+
zookeeperParent ='/hbase',
52+
tableName ='workerinfo01',
53+
rowKey ='channel'
54+
);
5555
```

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.text.SimpleDateFormat;
3939
import java.util.ArrayList;
4040
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
4143

4244
/**
4345
* author: jingzhen@dtstack.com
@@ -53,6 +55,7 @@ public class HbaseOutputFormat extends MetricOutputFormat {
5355
private String tableName;
5456
private String[] columnNames;
5557
private String[] columnTypes;
58+
private Map<String,String> columnNameFamily;
5659

5760
private String[] families;
5861
private String[] qualifiers;
@@ -196,6 +199,11 @@ public HbaseOutputFormatBuilder setColumnTypes(String[] columnTypes) {
196199
return this;
197200
}
198201

202+
public HbaseOutputFormatBuilder setColumnNameFamily(Map<String, String> columnNameFamily) {
203+
format.columnNameFamily = columnNameFamily;
204+
return this;
205+
}
206+
199207
public HbaseOutputFormat finish() {
200208
Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified");
201209
Preconditions.checkNotNull(format.tableName, "tableName should be specified");
@@ -205,13 +213,16 @@ public HbaseOutputFormat finish() {
205213
String[] families = new String[format.columnNames.length];
206214
String[] qualifiers = new String[format.columnNames.length];
207215

208-
for(int i = 0; i < format.columnNames.length; ++i) {
209-
String col = format.columnNames[i];
210-
String[] part = col.split(":");
211-
families[i] = part[0];
212-
qualifiers[i] = part[1];
216+
if (format.columnNameFamily != null) {
217+
Set<String> keySet = format.columnNameFamily.keySet();
218+
String[] columns = keySet.toArray(new String[keySet.size()]);
219+
for (int i = 0; i < columns.length; ++i) {
220+
String col = columns[i];
221+
String[] part = col.split(":");
222+
families[i] = part[0];
223+
qualifiers[i] = part[1];
224+
}
213225
}
214-
215226
format.families = families;
216227
format.qualifiers = qualifiers;
217228

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.flink.table.sinks.TableSink;
3636
import org.apache.flink.types.Row;
3737

38+
import java.util.Map;
39+
3840
/**
3941
* Date: 2018/09/14
4042
* Company: www.dtstack.com
@@ -43,6 +45,7 @@
4345
public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<HbaseSink> {
4446

4547
protected String[] fieldNames;
48+
protected Map<String, String> columnNameFamily;
4649
TypeInformation<?>[] fieldTypes;
4750
protected String zookeeperQuorum;
4851
protected String port;
@@ -62,6 +65,7 @@ public HbaseSink genStreamSink(TargetTableInfo targetTableInfo) {
6265
this.parent = hbaseTableInfo.getParent();
6366
this.tableName = hbaseTableInfo.getTableName();
6467
this.rowkey = hbaseTableInfo.getRowkey();
68+
this.columnNameFamily = hbaseTableInfo.getColumnNameFamily();
6569
return this;
6670
}
6771

@@ -72,6 +76,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
7276

7377
builder.setRowkey(rowkey);
7478
builder.setColumnNames(fieldNames);
79+
builder.setColumnNameFamily(columnNameFamily);
7580

7681
HbaseOutputFormat outputFormat = builder.finish();
7782
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323

2424
import com.dtstack.flink.sql.table.AbsTableParser;
2525
import com.dtstack.flink.sql.table.TableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2627
import com.dtstack.flink.sql.util.MathUtil;
2728

29+
import java.util.LinkedHashMap;
30+
import java.util.List;
2831
import java.util.Map;
2932

3033
import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
@@ -65,4 +68,38 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6568
hbaseTableInfo.setRowkey(rk.split(","));
6669
return hbaseTableInfo;
6770
}
71+
72+
public void parseFieldsInfo(String fieldsInfo, HbaseTableInfo tableInfo){
73+
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
74+
Map<String, String> columnFamilies = new LinkedHashMap<>();
75+
for(String fieldRow : fieldRows){
76+
fieldRow = fieldRow.trim();
77+
78+
String[] filedInfoArr = fieldRow.split("\\s+");
79+
if(filedInfoArr.length < 2 ){
80+
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
81+
}
82+
83+
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
84+
if(isMatcherKey){
85+
continue;
86+
}
87+
88+
//Compatible situation may arise in space in the fieldName
89+
String[] filedNameArr = new String[filedInfoArr.length - 1];
90+
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
91+
String fieldName = String.join(" ", filedNameArr);
92+
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
93+
Class fieldClass = dbTypeConvertToJavaType(fieldType);
94+
String[] columnFamily = fieldName.trim().split(":");
95+
columnFamilies.put(fieldName.trim(),columnFamily[1]);
96+
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
97+
tableInfo.addField(columnFamily[1]);
98+
tableInfo.addFieldClass(fieldClass);
99+
tableInfo.addFieldType(fieldType);
100+
tableInfo.addFieldExtraInfo(null);
101+
}
102+
tableInfo.setColumnNameFamily(columnFamilies);
103+
tableInfo.finish();
104+
}
68105
}

0 commit comments

Comments
 (0)