Skip to content

Commit e0dd1b5

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents b19b859 + 00a2d36 commit e0dd1b5

File tree

10 files changed

+128
-54
lines changed

10 files changed

+128
-54
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8484

8585
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
8686

87-
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
87+
List<String> fieldRows = DtStringUtil.splitField(fieldsInfo);
8888

8989
for (String fieldRow : fieldRows) {
9090
fieldRow = fieldRow.trim();

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,61 @@ public class DtStringUtil {
4646

4747
private static ObjectMapper objectMapper = new ObjectMapper();
4848

49-
5049
/**
5150
* Split the specified string delimiter --- ignored quotes delimiter
5251
* @param str
5352
* @param delimiter
5453
* @return
5554
*/
56-
public static List<String> splitIgnoreQuota(String str, char delimiter){
55+
public static List<String> splitIgnoreQuota(String str, char delimiter) {
56+
List<String> tokensList = new ArrayList<>();
57+
boolean inQuotes = false;
58+
boolean inSingleQuotes = false;
59+
int bracketLeftNum = 0;
60+
StringBuilder b = new StringBuilder();
61+
char[] chars = str.toCharArray();
62+
int idx = 0;
63+
for (char c : chars) {
64+
char flag = 0;
65+
if (idx > 0) {
66+
flag = chars[idx - 1];
67+
}
68+
if (c == delimiter) {
69+
if (inQuotes) {
70+
b.append(c);
71+
} else if (inSingleQuotes) {
72+
b.append(c);
73+
} else if (bracketLeftNum > 0) {
74+
b.append(c);
75+
} else {
76+
tokensList.add(b.toString());
77+
b = new StringBuilder();
78+
}
79+
} else if (c == '\"' && '\\' != flag && !inSingleQuotes) {
80+
inQuotes = !inQuotes;
81+
b.append(c);
82+
} else if (c == '\'' && '\\' != flag && !inQuotes) {
83+
inSingleQuotes = !inSingleQuotes;
84+
b.append(c);
85+
} else if (c == '(' && !inSingleQuotes && !inQuotes) {
86+
bracketLeftNum++;
87+
b.append(c);
88+
} else if (c == ')' && !inSingleQuotes && !inQuotes) {
89+
bracketLeftNum--;
90+
b.append(c);
91+
} else {
92+
b.append(c);
93+
}
94+
idx++;
95+
}
96+
97+
tokensList.add(b.toString());
98+
99+
return tokensList;
100+
}
101+
102+
public static List<String> splitField(String str) {
103+
final char delimiter = ',';
57104
List<String> tokensList = new ArrayList<>();
58105
boolean inQuotes = false;
59106
boolean inSingleQuotes = false;
@@ -106,7 +153,6 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
106153
return tokensList;
107154
}
108155

109-
110156
public static String replaceIgnoreQuota(String str, String oriStr, String replaceStr){
111157
String splitPatternStr = oriStr + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)(?=(?:[^']*'[^']*')*[^']*$)";
112158
return str.replaceAll(splitPatternStr, replaceStr);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,32 +26,25 @@
2626
import com.dtstack.flink.sql.side.JoinInfo;
2727
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2828
import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils;
29+
import com.dtstack.flink.sql.side.hbase.utils.HbaseUtils;
30+
import com.google.common.collect.Maps;
2931
import org.apache.calcite.sql.JoinType;
3032
import org.apache.commons.collections.map.HashedMap;
31-
import org.apache.commons.lang.StringUtils;
3233
import org.apache.flink.api.java.typeutils.RowTypeInfo;
33-
import com.google.common.collect.Maps;
3434
import org.apache.flink.table.runtime.types.CRow;
3535
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3636
import org.apache.flink.types.Row;
3737
import org.apache.flink.util.Collector;
3838
import org.apache.hadoop.conf.Configuration;
3939
import org.apache.hadoop.hbase.Cell;
4040
import org.apache.hadoop.hbase.CellUtil;
41-
import org.apache.hadoop.hbase.HBaseConfiguration;
4241
import org.apache.hadoop.hbase.TableName;
43-
import org.apache.hadoop.hbase.client.Connection;
44-
import org.apache.hadoop.hbase.client.ConnectionFactory;
45-
import org.apache.hadoop.hbase.client.Result;
46-
import org.apache.hadoop.hbase.client.ResultScanner;
47-
import org.apache.hadoop.hbase.client.Scan;
48-
import org.apache.hadoop.hbase.client.Table;
42+
import org.apache.hadoop.hbase.client.*;
4943
import org.apache.hadoop.hbase.util.Bytes;
5044
import org.apache.hadoop.security.UserGroupInformation;
5145
import org.slf4j.Logger;
5246
import org.slf4j.LoggerFactory;
5347

54-
import java.io.File;
5548
import java.io.IOException;
5649
import java.security.PrivilegedAction;
5750
import java.sql.SQLException;
@@ -177,6 +170,7 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
177170

178171
private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLException {
179172
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
173+
Map<String, String> colRefType = ((HbaseAllSideInfo)sideInfo).getColRefType();
180174
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
181175
boolean openKerberos = hbaseSideTableInfo.isKerberosAuthEnable();
182176
int loadDataCount = 0;
@@ -213,14 +207,12 @@ public Connection run() {
213207
resultScanner = table.getScanner(new Scan());
214208
for (Result r : resultScanner) {
215209
Map<String, Object> kv = new HashedMap();
216-
for (Cell cell : r.listCells())
217-
{
210+
for (Cell cell : r.listCells()) {
218211
String family = Bytes.toString(CellUtil.cloneFamily(cell));
219212
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
220-
String value = Bytes.toString(CellUtil.cloneValue(cell));
221213
StringBuilder key = new StringBuilder();
222214
key.append(family).append(":").append(qualifier);
223-
215+
Object value = HbaseUtils.convertByte(CellUtil.cloneValue(cell), colRefType.get(key.toString()));
224216
kv.put(aliasNameInversion.get(key.toString()), value);
225217
}
226218
loadDataCount++;

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,22 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.BaseSideInfo;
2626
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
27+
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
2728
import com.dtstack.flink.sql.util.ParseUtils;
29+
import com.google.common.collect.Maps;
2830
import org.apache.calcite.sql.SqlNode;
2931
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3032
import com.google.common.collect.Lists;
3133

3234
import java.util.List;
35+
import java.util.Map;
3336

3437
public class HbaseAllSideInfo extends BaseSideInfo {
3538

3639
private RowKeyBuilder rowKeyBuilder;
3740

41+
private Map<String, String> colRefType;
42+
3843
public HbaseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3944
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4045
}
@@ -48,6 +53,14 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
4853

4954
rowKeyBuilder.init(sideTableInfo.getPrimaryKeys().get(0));
5055

56+
HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo;
57+
colRefType = Maps.newHashMap();
58+
for(int i=0; i<hbaseSideTableInfo.getColumnRealNames().length; i++){
59+
String realColName = hbaseSideTableInfo.getColumnRealNames()[i];
60+
String colType = hbaseSideTableInfo.getFieldTypes()[i];
61+
colRefType.put(realColName, colType);
62+
}
63+
5164
String sideTableName = joinInfo.getSideTableName();
5265
SqlNode conditionNode = joinInfo.getCondition();
5366

@@ -67,4 +80,8 @@ public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
6780
this.rowKeyBuilder = rowKeyBuilder;
6881
}
6982

83+
public Map<String, String> getColRefType() {
84+
return colRefType;
85+
}
86+
7087
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
package com.dtstack.flink.sql.sink.hbase;
2020

2121
import org.apache.hadoop.conf.Configuration;
22-
import org.apache.hadoop.security.HadoopKerberosName;
22+
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
2323
import org.apache.hadoop.security.UserGroupInformation;
2424
import org.apache.hadoop.security.authentication.util.KerberosName;
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727
import sun.security.krb5.Config;
2828
import sun.security.krb5.KrbException;
2929

30+
import java.io.File;
3031
import java.io.IOException;
3132

3233
/**
@@ -77,6 +78,10 @@ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String
7778
throw new IllegalArgumentException("keytab can not be null");
7879
}
7980

81+
if (!new File(keytab).exists()){
82+
throw new IllegalArgumentIOException("keytab ["+ keytab + "] not exist");
83+
}
84+
8085
conf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, "Kerberos");
8186
//conf.set("hadoop.security.auth_to_local", "DEFAULT");
8287
conf.set(KEY_HADOOP_SECURITY_AUTH_TO_LOCAL, "RULE:[1:$1] RULE:[2:$1]");

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package com.dtstack.flink.sql.sink.hbase;
2222

23-
import com.dtstack.flink.sql.enums.EUpdateMode;
2423
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2524
import com.google.common.collect.Maps;
2625
import org.apache.commons.lang3.StringUtils;
@@ -31,10 +30,8 @@
3130
import org.apache.hadoop.hbase.*;
3231
import org.apache.hadoop.hbase.client.Connection;
3332
import org.apache.hadoop.hbase.client.ConnectionFactory;
34-
import org.apache.hadoop.hbase.client.Delete;
3533
import org.apache.hadoop.hbase.client.Put;
3634
import org.apache.hadoop.hbase.client.Table;
37-
import org.apache.hadoop.hbase.util.Bytes;
3835
import org.apache.hadoop.security.UserGroupInformation;
3936
import org.slf4j.Logger;
4037
import org.slf4j.LoggerFactory;
@@ -153,13 +150,8 @@ public Connection run() {
153150
@Override
154151
public void writeRecord(Tuple2 tuple2) {
155152
Tuple2<Boolean, Row> tupleTrans = tuple2;
156-
Boolean retract = tupleTrans.f0;
157153
Row row = tupleTrans.f1;
158-
if (retract) {
159-
dealInsert(row);
160-
} else if (!retract && StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
161-
dealDelete(row);
162-
}
154+
dealInsert(row);
163155
}
164156

165157
protected void dealInsert(Row record) {
@@ -185,26 +177,6 @@ protected void dealInsert(Row record) {
185177
outRecords.inc();
186178
}
187179

188-
protected void dealDelete(Row record) {
189-
String rowKey = buildRowKey(record);
190-
if (!StringUtils.isEmpty(rowKey)) {
191-
Delete delete = new Delete(Bytes.toBytes(rowKey));
192-
try {
193-
table.delete(delete);
194-
} catch (IOException e) {
195-
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
196-
LOG.error("record insert failed ..{}", record.toString());
197-
LOG.error("", e);
198-
}
199-
outDirtyRecords.inc();
200-
}
201-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
202-
LOG.info(record.toString());
203-
}
204-
outRecords.inc();
205-
}
206-
}
207-
208180
private Put getPutByRow(Row record) {
209181
String rowKey = buildRowKey(record);
210182
if (StringUtils.isEmpty(rowKey)) {
@@ -215,7 +187,7 @@ private Put getPutByRow(Row record) {
215187
Object fieldVal = record.getField(i);
216188
byte[] val = null;
217189
if (fieldVal != null) {
218-
val = fieldVal.toString().getBytes();
190+
val = HbaseUtil.toByte(fieldVal);
219191
}
220192
byte[] cf = families[i].getBytes();
221193
byte[] qualifier = qualifiers[i].getBytes();

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseUtil.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@
2222

2323
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
2424
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.hadoop.hbase.util.Bytes;
2526
import org.apache.hadoop.io.ByteWritable;
2627
import org.apache.hadoop.io.IntWritable;
2728
import org.apache.hadoop.io.Text;
29+
import scala.tools.nsc.Global;
30+
31+
import java.math.BigDecimal;
32+
import java.nio.ByteBuffer;
33+
import java.util.Objects;
2834

2935
/**
3036
* Created by softfly on 17/6/30.
@@ -64,4 +70,30 @@ public static TypeInformation columnTypeToTypeInformation(String type) {
6470

6571
}
6672

73+
public static byte[] toByte(Object value) {
74+
if (Objects.isNull(value)) {
75+
return new byte[]{};
76+
}
77+
if (value instanceof Integer) {
78+
return Bytes.toBytes((Integer) value);
79+
} else if (value instanceof Boolean) {
80+
return Bytes.toBytes((Boolean) value);
81+
} else if (value instanceof ByteBuffer) {
82+
return Bytes.toBytes((ByteBuffer) value);
83+
} else if (value instanceof Double) {
84+
return Bytes.toBytes((Double) value);
85+
} else if (value instanceof Float) {
86+
return Bytes.toBytes((Float) value);
87+
} else if (value instanceof Long) {
88+
return Bytes.toBytes((Long) value);
89+
} else if (value instanceof Short) {
90+
return Bytes.toBytes((Short) value);
91+
} else if (value instanceof String) {
92+
return Bytes.toBytes(String.valueOf(value));
93+
} else if (value instanceof BigDecimal) {
94+
return Bytes.toBytes((BigDecimal) value);
95+
}
96+
throw new RuntimeException("unkown dateType[" + value.toString() + "]");
97+
}
98+
6799
}

postgresql/postgresql-side/postgresql-all-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAllSideInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -38,4 +40,9 @@ public class PostgresqlAllSideInfo extends RdbAllSideInfo {
3840
public PostgresqlAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3941
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4042
}
43+
44+
@Override
45+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
46+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
47+
}
4148
}

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncReqRow.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
package com.dtstack.flink.sql.side.postgresql;
2121

22-
import com.dtstack.flink.sql.factory.DTThreadFactory;
22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
25-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
2726
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2827
import io.vertx.core.Vertx;
@@ -35,9 +34,6 @@
3534
import org.slf4j.LoggerFactory;
3635

3736
import java.util.List;
38-
import java.util.concurrent.LinkedBlockingQueue;
39-
import java.util.concurrent.ThreadPoolExecutor;
40-
import java.util.concurrent.TimeUnit;
4137

4238
/**
4339
* Date: 2019-08-11

postgresql/postgresql-side/postgresql-async-side/src/main/java/com/dtstack/flink/sql/side/postgresql/PostgresqlAsyncSideInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -39,4 +41,9 @@ public class PostgresqlAsyncSideInfo extends RdbAsyncSideInfo {
3941
public PostgresqlAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4042
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4143
}
44+
45+
@Override
46+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
47+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
48+
}
4249
}

0 commit comments

Comments
 (0)