Skip to content

Commit 6aee245

Browse files
author
dapeng
committed
hbase sink by dateType
1 parent 5693ad4 commit 6aee245

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private Put getPutByRow(Row record) {
129129
if (fieldVal == null) {
130130
continue;
131131
}
132-
byte[] val = fieldVal.toString().getBytes();
132+
byte[] val = HbaseUtil.toByte(fieldVal);
133133
byte[] cf = families[i].getBytes();
134134
byte[] qualifier = qualifiers[i].getBytes();
135135

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseUtil.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@
2222

2323
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
2424
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.hadoop.hbase.util.Bytes;
2526
import org.apache.hadoop.io.ByteWritable;
2627
import org.apache.hadoop.io.IntWritable;
2728
import org.apache.hadoop.io.Text;
29+
import scala.tools.nsc.Global;
30+
31+
import java.math.BigDecimal;
32+
import java.nio.ByteBuffer;
33+
import java.util.Objects;
2834

2935
/**
3036
* Created by softfly on 17/6/30.
@@ -64,9 +70,30 @@ public static TypeInformation columnTypeToTypeInformation(String type) {
6470

6571
}
6672

67-
public static byte[] toByte(Object value){
68-
if()
69-
return new byte[];
73+
public static byte[] toByte(Object value) {
74+
if (Objects.isNull(value)) {
75+
return new byte[]{};
76+
}
77+
if (value instanceof Integer) {
78+
return Bytes.toBytes((Integer) value);
79+
} else if (value instanceof Boolean) {
80+
return Bytes.toBytes((Boolean) value);
81+
} else if (value instanceof ByteBuffer) {
82+
return Bytes.toBytes((ByteBuffer) value);
83+
} else if (value instanceof Double) {
84+
return Bytes.toBytes((Double) value);
85+
} else if (value instanceof Float) {
86+
return Bytes.toBytes((Float) value);
87+
} else if (value instanceof Long) {
88+
return Bytes.toBytes((Long) value);
89+
} else if (value instanceof Short) {
90+
return Bytes.toBytes((Short) value);
91+
} else if (value instanceof String) {
92+
return Bytes.toBytes(String.valueOf(value));
93+
} else if (value instanceof BigDecimal) {
94+
return Bytes.toBytes((BigDecimal) value);
95+
}
96+
throw new RuntimeException("unkown dateType[" + value.toString() + "]");
7097
}
7198

7299
}

0 commit comments

Comments
 (0)