Skip to content

Commit c0f3b0e

Browse files
author
dapeng
committed
Merge remote-tracking branch 'origin/1.10_release_4.0.x' into hotfix_1.10_4.0.x_31763
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java
2 parents 4d087dc + 1e56647 commit c0f3b0e

File tree

25 files changed

+303
-83
lines changed

25 files changed

+303
-83
lines changed

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.exec.ParamsInfo;
2424
import org.apache.commons.lang.exception.ExceptionUtils;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/**
2830
* local模式获取sql任务的执行计划
@@ -32,15 +34,19 @@
3234
*/
3335
public class GetPlan {
3436

37+
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);
38+
3539
public static String getExecutionPlan(String[] args) {
3640
try {
3741
long start = System.currentTimeMillis();
3842
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
43+
paramsInfo.setGetPlan(true);
3944
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4045
String executionPlan = env.getExecutionPlan();
4146
long end = System.currentTimeMillis();
4247
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4348
} catch (Exception e) {
49+
LOG.error("Get plan error", e);
4450
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
4551
}
4652
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,6 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21-
import com.dtstack.flink.sql.parser.CreateFuncParser;
22-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
23-
import com.dtstack.flink.sql.parser.FlinkPlanner;
24-
import com.dtstack.flink.sql.parser.InsertSqlParser;
25-
import com.dtstack.flink.sql.parser.SqlParser;
26-
import com.dtstack.flink.sql.parser.SqlTree;
27-
import com.dtstack.flink.sql.util.TypeInfoDataTypeConverter;
28-
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
30-
import org.apache.flink.streaming.api.datastream.DataStream;
31-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
32-
import org.apache.flink.table.api.*;
33-
import org.apache.flink.table.api.java.StreamTableEnvironment;
34-
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
35-
import org.apache.flink.table.sinks.TableSink;
36-
3721
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
3822
import com.dtstack.flink.sql.enums.ClusterMode;
3923
import com.dtstack.flink.sql.enums.ECacheType;
@@ -43,15 +27,22 @@
4327
import com.dtstack.flink.sql.function.FunctionManager;
4428
import com.dtstack.flink.sql.option.OptionParser;
4529
import com.dtstack.flink.sql.option.Options;
46-
import com.dtstack.flink.sql.side.SideSqlExec;
30+
import com.dtstack.flink.sql.parser.CreateFuncParser;
31+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32+
import com.dtstack.flink.sql.parser.FlinkPlanner;
33+
import com.dtstack.flink.sql.parser.InsertSqlParser;
34+
import com.dtstack.flink.sql.parser.SqlParser;
35+
import com.dtstack.flink.sql.parser.SqlTree;
4736
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
37+
import com.dtstack.flink.sql.side.SideSqlExec;
4838
import com.dtstack.flink.sql.sink.StreamSinkFactory;
4939
import com.dtstack.flink.sql.source.StreamSourceFactory;
5040
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
5141
import com.dtstack.flink.sql.table.AbstractTableInfo;
5242
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
5343
import com.dtstack.flink.sql.util.DtStringUtil;
5444
import com.dtstack.flink.sql.util.PluginUtil;
45+
import com.dtstack.flink.sql.util.TypeInfoDataTypeConverter;
5546
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
5647
import com.fasterxml.jackson.databind.ObjectMapper;
5748
import com.google.common.base.Preconditions;
@@ -63,8 +54,18 @@
6354
import org.apache.calcite.sql.SqlNode;
6455
import org.apache.commons.io.Charsets;
6556
import org.apache.commons.lang3.StringUtils;
57+
import org.apache.flink.api.common.typeinfo.TypeInformation;
58+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
59+
import org.apache.flink.streaming.api.datastream.DataStream;
60+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
61+
import org.apache.flink.table.api.EnvironmentSettings;
62+
import org.apache.flink.table.api.Table;
63+
import org.apache.flink.table.api.TableConfig;
64+
import org.apache.flink.table.api.TableEnvironment;
65+
import org.apache.flink.table.api.java.StreamTableEnvironment;
66+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
67+
import org.apache.flink.table.sinks.TableSink;
6668
import org.apache.flink.table.types.DataType;
67-
import org.apache.flink.table.types.utils.TypeConversions;
6869
import org.slf4j.Logger;
6970
import org.slf4j.LoggerFactory;
7071

@@ -74,13 +75,13 @@
7475
import java.net.URLClassLoader;
7576
import java.net.URLDecoder;
7677
import java.time.ZoneId;
78+
import java.util.ArrayList;
7779
import java.util.Arrays;
7880
import java.util.List;
7981
import java.util.Map;
8082
import java.util.Properties;
8183
import java.util.Set;
8284
import java.util.TimeZone;
83-
import java.util.ArrayList;
8485
import java.util.stream.Stream;
8586

8687
/**
@@ -162,7 +163,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
162163
Map<String, Table> registerTableCache = Maps.newHashMap();
163164

164165
//register udf
165-
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
166+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv, paramsInfo.isGetPlan());
166167
//register table schema
167168
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
168169
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
@@ -247,13 +248,19 @@ private static void sqlTranslation(String localSqlPluginPath,
247248
}
248249
}
249250

250-
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
251+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
251252
throws IllegalAccessException, InvocationTargetException {
252253
// udf和tableEnv须由同一个类加载器加载
253254
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
254255
URLClassLoader classLoader = null;
255256
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
256257
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
258+
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
259+
if (getPlan) {
260+
URL[] urls = jarUrlList.toArray(new URL[0]);
261+
classLoader = URLClassLoader.newInstance(urls);
262+
}
263+
257264
//classloader
258265
if (classLoader == null) {
259266
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Properties;
2526

2627
/**
@@ -39,6 +40,7 @@ public class ParamsInfo {
3940
private String pluginLoadMode;
4041
private String deployMode;
4142
private Properties confProp;
43+
private boolean getPlan = false;
4244

4345
public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSqlPluginPath,
4446
String remoteSqlPluginPath, String pluginLoadMode, String deployMode, Properties confProp) {
@@ -52,6 +54,14 @@ public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSql
5254
this.confProp = confProp;
5355
}
5456

57+
public boolean isGetPlan() {
58+
return getPlan;
59+
}
60+
61+
public void setGetPlan(boolean getPlan) {
62+
this.getPlan = getPlan;
63+
}
64+
5565
public String getSql() {
5666
return sql;
5767
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public void asyncInvoke(Row row, ResultFuture<BaseRow> resultFuture) throws Exce
194194
}
195195

196196
private Map<String, Object> parseInputParam(Row input) {
197-
Map<String, Object> inputParams = Maps.newHashMap();
197+
Map<String, Object> inputParams = Maps.newLinkedHashMap();
198198
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
199199
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
200200
Object equalObj = input.getField(conValIndex);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.commons.collections.CollectionUtils;
4545
import org.apache.commons.lang3.StringUtils;
4646
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
47+
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
4748
import org.apache.flink.api.common.typeinfo.TypeInformation;
4849
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4950
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,9 +55,11 @@
5455
import org.apache.flink.table.catalog.ObjectIdentifier;
5556
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
5657
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
58+
import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo;
5759
import org.apache.flink.table.types.logical.DecimalType;
5860
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5961
import org.apache.flink.table.types.logical.LogicalType;
62+
import org.apache.flink.table.types.logical.TimestampType;
6063
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
6164
import org.apache.flink.types.Row;
6265
import org.slf4j.Logger;
@@ -387,6 +390,11 @@ private void joinFun(Object pollObj,
387390
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
388391
logicalTypes[i] = new DecimalType(38, 18);
389392
}
393+
394+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
395+
(((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(LegacyLocalDateTimeTypeInfo.class))) {
396+
logicalTypes[i] = new TimestampType(TimestampType.MAX_PRECISION);
397+
}
390398
}
391399

392400
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -425,11 +433,15 @@ private void joinFun(Object pollObj,
425433
targetTable = localTableCache.get(joinInfo.getLeftTableName());
426434
}
427435

428-
TypeInformation[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
436+
TypeInformation<?>[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
429437
for (int i = 0; i < fieldDataTypes.length; i++) {
430438
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
431439
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
432440
}
441+
442+
if (fieldDataTypes[i].getClass().equals(LegacyLocalDateTimeTypeInfo.class)) {
443+
fieldDataTypes[i] = LocalTimeTypeInfo.LOCAL_DATE_TIME;
444+
}
433445
}
434446

435447
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020

2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.security.UserGroupInformation;
23+
import org.apache.hadoop.security.authentication.util.KerberosName;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
26+
import sun.security.krb5.Config;
27+
import sun.security.krb5.KrbException;
2528

2629
import java.io.IOException;
2730

@@ -40,9 +43,16 @@ public class KrbUtils {
4043
// public static final String FALSE_STR = "false";
4144
// public static final String SUBJECT_ONLY_KEY = "javax.security.auth.useSubjectCredsOnly";
4245

43-
public static UserGroupInformation getUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
46+
public static UserGroupInformation loginAndReturnUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
4447
LOG.info("Kerberos login with principal: {} and keytab: {}", principal, keytabPath);
4548
System.setProperty(KRB5_CONF_KEY, krb5confPath);
49+
// 不刷新会读/etc/krb5.conf
50+
try {
51+
Config.refresh();
52+
KerberosName.resetDefaultRealm();
53+
} catch (KrbException e) {
54+
LOG.warn("resetting default realm failed, current default realm will still be used.", e);
55+
}
4656
// TODO 尚未探索出此选项的意义,以后研究明白方可打开
4757
// System.setProperty(SUBJECT_ONLY_KEY, FALSE_STR);
4858
Configuration configuration = new Configuration();

core/src/main/java/com/dtstack/flink/sql/util/RowDataConvert.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.sql.Time;
3232
import java.sql.Timestamp;
3333
import java.time.LocalDate;
34+
import java.time.LocalDateTime;
3435
import java.time.LocalTime;
3536

3637
/**
@@ -45,11 +46,15 @@ public static BaseRow convertToBaseRow(Row row) {
4546
int length = row.getArity();
4647
GenericRow genericRow = new GenericRow(length);
4748
for (int i = 0; i < length; i++) {
48-
if (row.getField(i) instanceof String) {
49+
if (row.getField(i) == null) {
50+
genericRow.setField(i, row.getField(i));
51+
} else if (row.getField(i) instanceof String) {
4952
genericRow.setField(i, BinaryString.fromString((String) row.getField(i)));
5053
} else if (row.getField(i) instanceof Timestamp) {
5154
SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp) row.getField(i)));
5255
genericRow.setField(i, newTimestamp);
56+
} else if (row.getField(i) instanceof LocalDateTime) {
57+
genericRow.setField(i, SqlTimestamp.fromLocalDateTime((LocalDateTime) row.getField(i)));
5358
} else if (row.getField(i) instanceof Time) {
5459
genericRow.setField(i, DataFormatConverters.TimeConverter.INSTANCE.toInternal((Time) row.getField(i)));
5560
} else if (row.getField(i) instanceof Double || row.getField(i).getClass().equals(double.class)) {

core/src/test/java/com/dtstack/flink/sql/util/KrbUtilsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
**/
3131
public class KrbUtilsTest {
3232
@Test
33-
public void testGetUgi() throws IOException {
33+
public void testLoginAndReturnUgi() throws IOException {
3434
String principal = "";
3535
String keytabPath = "";
3636
String krb5confPath = "";
3737
try {
38-
KrbUtils.getUgi(principal, keytabPath, krb5confPath);
38+
KrbUtils.loginAndReturnUgi(principal, keytabPath, krb5confPath);
3939
} catch (IllegalArgumentException e) {
4040
Assert.assertEquals(e.getMessage(), "Can't get Kerberos realm");
4141
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ protected Row fillData(Row input, Object sideInput){
9494
Row row = new Row(outFieldInfoList.size());
9595
for(Map.Entry<Integer, Integer> entry : inFieldIndex.entrySet()){
9696
Object obj = input.getField(entry.getValue());
97-
if(obj instanceof Timestamp){
98-
obj = ((Timestamp)obj).getTime();
99-
}
10097
row.setField(entry.getKey(), obj);
10198
}
10299

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public Connection getConn(String dbUrl, String userName, String password) {
7373
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7474
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7575
String principal = impalaSideTableInfo.getPrincipal();
76-
UserGroupInformation ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
76+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
7777
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
7878
@Override
7979
public Connection run() throws SQLException {

0 commit comments

Comments
 (0)