Skip to content

Commit 20e23ca

Browse files
author
dapeng
committed
Merge branch '1.8_release_3.10.x' into 1.10_test_4.0.x
# Conflicts: # hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java
2 parents 7a705eb + 8cb6d5a commit 20e23ca

File tree

5 files changed

+60
-34
lines changed

5 files changed

+60
-34
lines changed

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.side.JoinInfo;
2727
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2828
import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils;
29+
import com.dtstack.flink.sql.side.hbase.utils.HbaseUtils;
2930
import com.dtstack.flink.sql.util.RowDataComplete;
3031
import org.apache.calcite.sql.JoinType;
3132
import org.apache.commons.collections.map.HashedMap;
@@ -175,6 +176,7 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
175176

176177
private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLException {
177178
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
179+
Map<String, String> colRefType = ((HbaseAllSideInfo)sideInfo).getColRefType();
178180
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
179181
boolean openKerberos = hbaseSideTableInfo.isKerberosAuthEnable();
180182
int loadDataCount = 0;
@@ -211,14 +213,12 @@ public Connection run() {
211213
resultScanner = table.getScanner(new Scan());
212214
for (Result r : resultScanner) {
213215
Map<String, Object> kv = new HashedMap();
214-
for (Cell cell : r.listCells())
215-
{
216+
for (Cell cell : r.listCells()) {
216217
String family = Bytes.toString(CellUtil.cloneFamily(cell));
217218
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
218-
String value = Bytes.toString(CellUtil.cloneValue(cell));
219219
StringBuilder key = new StringBuilder();
220220
key.append(family).append(":").append(qualifier);
221-
221+
Object value = HbaseUtils.convertByte(CellUtil.cloneValue(cell), colRefType.get(key.toString()));
222222
kv.put(aliasNameInversion.get(key.toString()), value);
223223
}
224224
loadDataCount++;

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,22 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.BaseSideInfo;
2626
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
27+
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2728
import com.dtstack.flink.sql.util.ParseUtils;
29+
import com.google.common.collect.Maps;
2830
import org.apache.calcite.sql.SqlNode;
2931
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3032
import com.google.common.collect.Lists;
3133

3234
import java.util.List;
35+
import java.util.Map;
3336

3437
public class HbaseAllSideInfo extends BaseSideInfo {
3538

3639
private RowKeyBuilder rowKeyBuilder;
3740

41+
private Map<String, String> colRefType;
42+
3843
public HbaseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3944
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4045
}
@@ -48,6 +53,14 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
4853

4954
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0));
5055

56+
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
57+
colRefType = Maps.newHashMap();
58+
for(int i=0; i<hbaseSideTableInfo.getColumnRealNames().length; i++){
59+
String realColName = hbaseSideTableInfo.getColumnRealNames()[i];
60+
String colType = hbaseSideTableInfo.getFieldTypes()[i];
61+
colRefType.put(realColName, colType);
62+
}
63+
5164
String sideTableName = joinInfo.getSideTableName();
5265
SqlNode conditionNode = joinInfo.getCondition();
5366

@@ -67,4 +80,8 @@ public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
6780
this.rowKeyBuilder = rowKeyBuilder;
6881
}
6982

83+
public Map<String, String> getColRefType() {
84+
return colRefType;
85+
}
86+
7087
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import sun.security.krb5.Config;
2727
import sun.security.krb5.KrbException;
2828

29+
import java.io.File;
2930
import java.io.IOException;
3031

3132
/**
@@ -76,6 +77,10 @@ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String
7677
throw new IllegalArgumentException("keytab can not be null");
7778
}
7879

80+
if (!new File(keytab).exists()){
81+
throw new IllegalArgumentIOException("keytab ["+ keytab + "] not exist");
82+
}
83+
7984
conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, "Kerberos");
8085
//conf.set("hadoop.security.auth_to_local", "DEFAULT");
8186
conf.set(KEY_HADOOP_SECURITY_AUTH_TO_LOCAL, "RULE:[1:$1] RULE:[2:$1]");

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package com.dtstack.flink.sql.sink.hbase;
2222

23-
import com.dtstack.flink.sql.enums.EUpdateMode;
2423
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2524
import com.google.common.collect.Maps;
2625
import org.apache.commons.lang3.StringUtils;
@@ -31,10 +30,8 @@
3130
import org.apache.hadoop.hbase.*;
3231
import org.apache.hadoop.hbase.client.Connection;
3332
import org.apache.hadoop.hbase.client.ConnectionFactory;
34-
import org.apache.hadoop.hbase.client.Delete;
3533
import org.apache.hadoop.hbase.client.Put;
3634
import org.apache.hadoop.hbase.client.Table;
37-
import org.apache.hadoop.hbase.util.Bytes;
3835
import org.apache.hadoop.security.UserGroupInformation;
3936
import org.slf4j.Logger;
4037
import org.slf4j.LoggerFactory;
@@ -153,13 +150,8 @@ public Connection run() {
153150
@Override
154151
public void writeRecord(Tuple2 tuple2) {
155152
Tuple2<Boolean, Row> tupleTrans = tuple2;
156-
Boolean retract = tupleTrans.f0;
157153
Row row = tupleTrans.f1;
158-
if (retract) {
159-
dealInsert(row);
160-
} else if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
161-
dealDelete(row);
162-
}
154+
dealInsert(row);
163155
}
164156

165157
protected void dealInsert(Row record) {
@@ -185,26 +177,6 @@ protected void dealInsert(Row record) {
185177
outRecords.inc();
186178
}
187179

188-
protected void dealDelete(Row record) {
189-
String rowKey = buildRowKey(record);
190-
if (!StringUtils.isEmpty(rowKey)) {
191-
Delete delete = new Delete(Bytes.toBytes(rowKey));
192-
try {
193-
table.delete(delete);
194-
} catch (IOException e) {
195-
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
196-
LOG.error("record insert failed ..{}", record.toString());
197-
LOG.error("", e);
198-
}
199-
outDirtyRecords.inc();
200-
}
201-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
202-
LOG.info(record.toString());
203-
}
204-
outRecords.inc();
205-
}
206-
}
207-
208180
private Put getPutByRow(Row record) {
209181
String rowKey = buildRowKey(record);
210182
if (StringUtils.isEmpty(rowKey)) {
@@ -215,7 +187,7 @@ private Put getPutByRow(Row record) {
215187
Object fieldVal = record.getField(i);
216188
byte[] val = null;
217189
if (fieldVal != null) {
218-
val = fieldVal.toString().getBytes();
190+
val = HbaseUtil.toByte(fieldVal);
219191
}
220192
byte[] cf = families[i].getBytes();
221193
byte[] qualifier = qualifiers[i].getBytes();

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseUtil.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@
2222

2323
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
2424
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.hadoop.hbase.util.Bytes;
2526
import org.apache.hadoop.io.ByteWritable;
2627
import org.apache.hadoop.io.IntWritable;
2728
import org.apache.hadoop.io.Text;
29+
import scala.tools.nsc.Global;
30+
31+
import java.math.BigDecimal;
32+
import java.nio.ByteBuffer;
33+
import java.util.Objects;
2834

2935
/**
3036
* Created by softfly on 17/6/30.
@@ -64,4 +70,30 @@ public static TypeInformation columnTypeToTypeInformation(String type) {
6470

6571
}
6672

73+
public static byte[] toByte(Object value) {
74+
if (Objects.isNull(value)) {
75+
return new byte[]{};
76+
}
77+
if (value instanceof Integer) {
78+
return Bytes.toBytes((Integer) value);
79+
} else if (value instanceof Boolean) {
80+
return Bytes.toBytes((Boolean) value);
81+
} else if (value instanceof ByteBuffer) {
82+
return Bytes.toBytes((ByteBuffer) value);
83+
} else if (value instanceof Double) {
84+
return Bytes.toBytes((Double) value);
85+
} else if (value instanceof Float) {
86+
return Bytes.toBytes((Float) value);
87+
} else if (value instanceof Long) {
88+
return Bytes.toBytes((Long) value);
89+
} else if (value instanceof Short) {
90+
return Bytes.toBytes((Short) value);
91+
} else if (value instanceof String) {
92+
return Bytes.toBytes(String.valueOf(value));
93+
} else if (value instanceof BigDecimal) {
94+
return Bytes.toBytes((BigDecimal) value);
95+
}
96+
throw new RuntimeException("unkown dateType[" + value.toString() + "]");
97+
}
98+
6799
}

0 commit comments

Comments
 (0)