Skip to content

Commit 5693ad4

Browse files
author
dapeng
committed
结果表格式
1 parent a831177 commit 5693ad4

File tree

2 files changed

+6
-26
lines changed

2 files changed

+6
-26
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,8 @@ public void open(int taskNumber, int numTasks) throws IOException {
9292
@Override
9393
public void writeRecord(Tuple2 tuple2) {
9494
Tuple2<Boolean, Row> tupleTrans = tuple2;
95-
Boolean retract = tupleTrans.f0;
9695
Row row = tupleTrans.f1;
97-
if (retract) {
98-
dealInsert(row);
99-
} else if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
100-
dealDelete(row);
101-
}
96+
dealInsert(row);
10297
}
10398

10499
protected void dealInsert(Row record) {
@@ -123,26 +118,6 @@ protected void dealInsert(Row record) {
123118
outRecords.inc();
124119
}
125120

126-
protected void dealDelete(Row record) {
127-
String rowKey = buildRowKey(record);
128-
if (!StringUtils.isEmpty(rowKey)) {
129-
Delete delete = new Delete(Bytes.toBytes(rowKey));
130-
try {
131-
table.delete(delete);
132-
} catch (IOException e) {
133-
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
134-
LOG.error("record insert failed ..{}", record.toString());
135-
LOG.error("", e);
136-
}
137-
outDirtyRecords.inc();
138-
}
139-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
140-
LOG.info(record.toString());
141-
}
142-
outRecords.inc();
143-
}
144-
}
145-
146121
private Put getPutByRow(Row record) {
147122
String rowKey = buildRowKey(record);
148123
if (StringUtils.isEmpty(rowKey)) {

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,9 @@ public static TypeInformation columnTypeToTypeInformation(String type) {
6464

6565
}
6666

67+
public static byte[] toByte(Object value){
68+
if()
69+
return new byte[];
70+
}
71+
6772
}

0 commit comments

Comments
 (0)