Skip to content

Commit 40a86e7

Browse files
committed
Merge branch '1.10_release_4.0.x' into 'hotfix_1.10_4.0.x_31804'
# Conflicts: # localTest/pom.xml
2 parents f219400 + c3069ae commit 40a86e7

File tree

11 files changed

+43
-23
lines changed

11 files changed

+43
-23
lines changed

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020

2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.security.UserGroupInformation;
23+
import org.apache.hadoop.security.authentication.util.KerberosName;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
26+
import sun.security.krb5.Config;
27+
import sun.security.krb5.KrbException;
2528

2629
import java.io.IOException;
2730

@@ -40,9 +43,16 @@ public class KrbUtils {
4043
// public static final String FALSE_STR = "false";
4144
// public static final String SUBJECT_ONLY_KEY = "javax.security.auth.useSubjectCredsOnly";
4245

43-
public static UserGroupInformation getUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
46+
public static UserGroupInformation loginAndReturnUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
4447
LOG.info("Kerberos login with principal: {} and keytab: {}", principal, keytabPath);
4548
System.setProperty(KRB5_CONF_KEY, krb5confPath);
49+
// 不刷新会读/etc/krb5.conf
50+
try {
51+
Config.refresh();
52+
KerberosName.resetDefaultRealm();
53+
} catch (KrbException e) {
54+
LOG.warn("resetting default realm failed, current default realm will still be used.", e);
55+
}
4656
// TODO 尚未探索出此选项的意义,以后研究明白方可打开
4757
// System.setProperty(SUBJECT_ONLY_KEY, FALSE_STR);
4858
Configuration configuration = new Configuration();

core/src/test/java/com/dtstack/flink/sql/util/KrbUtilsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
**/
3131
public class KrbUtilsTest {
3232
@Test
33-
public void testGetUgi() throws IOException {
33+
public void testLoginAndReturnUgi() throws IOException {
3434
String principal = "";
3535
String keytabPath = "";
3636
String krb5confPath = "";
3737
try {
38-
KrbUtils.getUgi(principal, keytabPath, krb5confPath);
38+
KrbUtils.loginAndReturnUgi(principal, keytabPath, krb5confPath);
3939
} catch (IllegalArgumentException e) {
4040
Assert.assertEquals(e.getMessage(), "Can't get Kerberos realm");
4141
}

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public Connection getConn(String dbUrl, String userName, String password) {
7373
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7474
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7575
String principal = impalaSideTableInfo.getPrincipal();
76-
UserGroupInformation ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
76+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
7777
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
7878
@Override
7979
public Connection run() throws SQLException {

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void open(Configuration parameters) throws Exception {
7272
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7373
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7474
String principal = impalaSideTableInfo.getPrincipal();
75-
ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
75+
ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
7676
openJdbc(parameters);
7777
} else {
7878
openJdbc(parameters);

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.ScheduledFuture;
5353
import java.util.concurrent.ScheduledThreadPoolExecutor;
5454
import java.util.concurrent.TimeUnit;
55+
import java.util.concurrent.atomic.AtomicReference;
5556
import java.util.regex.Matcher;
5657
import java.util.regex.Pattern;
5758
import java.util.stream.Collectors;
@@ -76,7 +77,7 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7677
// cast(value as string) -> cast('value' as string) cast(value as timestamp) -> cast('value' as timestamp)
7778
private static final Pattern TYPE_PATTERN = Pattern.compile("cast\\((.*) as (.*)\\)");
7879
//specific type which values need to be quoted
79-
private static final String[] NEED_QUOTE_TYPE = {"string", "timestamp"};
80+
private static final String[] NEED_QUOTE_TYPE = {"string", "timestamp", "varchar"};
8081

8182
private static final Integer DEFAULT_CONN_TIME_OUT = 60;
8283
private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000;
@@ -85,8 +86,6 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
8586
private static final String KUDU_TYPE = "kudu";
8687
private static final String UPDATE_MODE = "update";
8788
private static final String PARTITION_CONSTANT = "PARTITION";
88-
private static final String STRING_TYPE = "STRING";
89-
private static final String TIMESTAMP_TYPE = "TIMESTAMP";
9089
private static final String DRIVER_NAME = "com.cloudera.impala.jdbc41.Driver";
9190

9291
private static final String VALUES_CONDITION = "${valuesCondition}";
@@ -208,7 +207,7 @@ private void initScheduledTask(Long batchWaitInterval) {
208207

209208
private void openConnect() throws IOException {
210209
if (authMech == 1) {
211-
UserGroupInformation ugi = KrbUtils.getUgi(principal, keytabPath, krb5confPath);
210+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(principal, keytabPath, krb5confPath);
212211
try {
213212
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
214213
openJdbc();
@@ -363,24 +362,28 @@ private List<String> rebuildFieldNameListAndTypeList(List<String> fieldNames, Li
363362
* @return quoted condition
364363
*/
365364
private String valueConditionAddQuotation(String valueCondition) {
366-
final String[] valueConditionCopy = {valueCondition};
367365
String[] temps = valueCondition.split(",");
366+
List<String> replacedItem = new ArrayList<>();
368367
Arrays.stream(temps).forEach(
369368
item -> {
370369
Matcher matcher = TYPE_PATTERN.matcher(item);
371370
while (matcher.find()) {
372371
String value = matcher.group(1);
373372
String type = matcher.group(2);
374373

375-
if (Arrays.asList(NEED_QUOTE_TYPE).contains(type)) {
376-
if (!"null".equals(value)) {
377-
valueConditionCopy[0] = valueConditionCopy[0].replace(value, "'" + value + "'");
374+
for (String needQuoteType : NEED_QUOTE_TYPE) {
375+
if (type.contains(needQuoteType)) {
376+
if (!"null".equals(value)) {
377+
item = item.replace(value, "'" + value + "'");
378+
}
378379
}
379380
}
380381
}
382+
replacedItem.add(item);
381383
}
382384
);
383-
return "(" + valueConditionCopy[0] + ")";
385+
386+
return "(" + String.join(", ", replacedItem) + ")";
384387
}
385388

386389
@Override
@@ -580,8 +583,10 @@ private String buildTableFieldsCondition(List<String> fieldNames, String partiti
580583
private String buildValuesCondition(List<String> fieldTypes, Row row) {
581584
String valuesCondition = fieldTypes.stream().map(
582585
f -> {
583-
if (Arrays.asList(NEED_QUOTE_TYPE).contains(f.toLowerCase())) {
584-
return String.format("cast(? as %s)", f.toLowerCase());
586+
for(String item : NEED_QUOTE_TYPE) {
587+
if (f.toLowerCase().contains(item)) {
588+
return String.format("cast(? as %s)", f.toLowerCase());
589+
}
585590
}
586591
return "?";
587592
}).collect(Collectors.joining(", "));
@@ -745,6 +750,8 @@ public ImpalaOutputFormat build() {
745750
checkNotNull(format.password, "password is required!");
746751
}
747752

753+
checkNotNull(format.storeType, "storeType is required!");
754+
748755
return format;
749756
}
750757

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
public class ImpalaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<ImpalaSink> {
4747

4848
private static final String DEFAULT_STORE_TYPE = "kudu";
49-
private static final String DEFAULT_PARTITION_MODE = "dynamic";
5049

5150
protected String[] fieldNames;
5251
TypeInformation<?>[] fieldTypes;

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaSinkParser.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Arrays;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Objects;
2829

2930
/**
3031
* Date: 2020/10/14
@@ -76,6 +77,8 @@ public class ImpalaSinkParser extends AbstractTableParser {
7677

7778
private static final String KUDU_TYPE = "kudu";
7879

80+
private static final String DEFAULT_STORE_TYPE = "kudu";
81+
7982
private static final String STORE_TYPE_KEY = "storeType";
8083

8184
private static final String KRB_DEFAULT_REALM = "HADOOP.COM";
@@ -123,13 +126,13 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
123126
}
124127

125128
String storeType = MathUtil.getString(props.get(STORE_TYPE_KEY.toLowerCase()));
126-
impalaTableInfo.setStoreType(storeType);
129+
impalaTableInfo.setStoreType(Objects.isNull(storeType) ? DEFAULT_STORE_TYPE : storeType);
127130

128131
String enablePartitionStr = (String) props.get(ENABLE_PARTITION_KEY.toLowerCase());
129132
boolean enablePartition = MathUtil.getBoolean(enablePartitionStr == null ? "false" : enablePartitionStr);
130133
impalaTableInfo.setEnablePartition(enablePartition);
131134

132-
if (!storeType.equalsIgnoreCase(KUDU_TYPE) && enablePartition) {
135+
if (!impalaTableInfo.getStoreType().equalsIgnoreCase(KUDU_TYPE) && enablePartition) {
133136
String partitionFields = MathUtil.getString(props.get(PARTITION_FIELDS_KEY.toLowerCase()));
134137
impalaTableInfo.setPartitionFields(partitionFields);
135138
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.util.MathUtil;
2727

2828
import java.util.Map;
29+
import java.util.UUID;
2930
import java.util.stream.Collectors;
3031

3132
/**
@@ -46,7 +47,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4647
kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase())));
4748
kafkaSourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
4849
kafkaSourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
49-
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
50+
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase(), UUID.randomUUID().toString().replace("-", ""))));
5051
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5152
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5253
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private KuduClient getClient(KuduSideTableInfo tableInfo) throws IOException {
227227
}
228228

229229
if (tableInfo.isEnableKrb()) {
230-
UserGroupInformation ugi = KrbUtils.getUgi(tableInfo.getPrincipal(), tableInfo.getKeytab(), tableInfo.getKrb5conf());
230+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(tableInfo.getPrincipal(), tableInfo.getKeytab(), tableInfo.getKrb5conf());
231231
return ugi.doAs(new PrivilegedAction<KuduClient>() {
232232
@Override
233233
public KuduClient run() {

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private AsyncKuduClient getClient() throws IOException {
132132
}
133133

134134
if (kuduSideTableInfo.isEnableKrb()) {
135-
UserGroupInformation ugi = KrbUtils.getUgi(
135+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
136136
kuduSideTableInfo.getPrincipal(),
137137
kuduSideTableInfo.getKeytab(),
138138
kuduSideTableInfo.getKrb5conf()

0 commit comments

Comments
 (0)