Skip to content

Commit d5fcf04

Browse files
author
toutian
committed
Merge branch 'v1.8.0_dirtydata_manager' into 'v1.8.0_dev'
dirtydata manager See merge request !153
2 parents 0d31954 + 5053fc4 commit d5fcf04

File tree

32 files changed

+1281
-90
lines changed

32 files changed

+1281
-90
lines changed

core/pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
<calcite.server.version>1.16.0</calcite.server.version>
2121
<jackson.version>2.7.9</jackson.version>
2222
<guava.version>19.0</guava.version>
23+
<hadoop.version>2.7.3</hadoop.version>
2324
</properties>
2425

2526
<dependencies>
@@ -46,6 +47,12 @@
4647
<groupId>org.apache.flink</groupId>
4748
<artifactId>flink-streaming-java_2.11</artifactId>
4849
<version>${flink.version}</version>
50+
<exclusions>
51+
<exclusion>
52+
<artifactId>flink-hadoop-fs</artifactId>
53+
<groupId>org.apache.flink</groupId>
54+
</exclusion>
55+
</exclusions>
4956
</dependency>
5057

5158
<dependency>
@@ -107,6 +114,12 @@
107114
<groupId>org.apache.flink</groupId>
108115
<artifactId>flink-yarn_2.11</artifactId>
109116
<version>${flink.version}</version>
117+
<exclusions>
118+
<exclusion>
119+
<artifactId>flink-shaded-hadoop2</artifactId>
120+
<groupId>org.apache.flink</groupId>
121+
</exclusion>
122+
</exclusions>
110123
</dependency>
111124

112125
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
@@ -116,6 +129,22 @@
116129
<version>${flink.version}</version>
117130
</dependency>
118131

132+
<dependency>
133+
<groupId>org.apache.hadoop</groupId>
134+
<artifactId>hadoop-common</artifactId>
135+
<version>${hadoop.version}</version>
136+
</dependency>
137+
138+
<dependency>
139+
<groupId>org.apache.hadoop</groupId>
140+
<artifactId>hadoop-hdfs</artifactId>
141+
<version>${hadoop.version}</version>
142+
</dependency>
143+
<dependency>
144+
<groupId>org.apache.hadoop</groupId>
145+
<artifactId>hadoop-mapreduce-client-core</artifactId>
146+
<version>${hadoop.version}</version>
147+
</dependency>
119148
</dependencies>
120149

121150
<build>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import com.dtstack.flink.sql.config.CalciteConfig;
2424
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
25+
import com.dtstack.flink.sql.config.DirtyConfig;
2526
import com.dtstack.flink.sql.constrant.ConfigConstrant;
27+
import com.dtstack.flink.sql.dirty.DirtyDataManager;
2628
import com.dtstack.flink.sql.enums.ClusterMode;
2729
import com.dtstack.flink.sql.enums.ECacheType;
2830
import com.dtstack.flink.sql.enums.EPluginLoadMode;
@@ -48,6 +50,7 @@
4850
import com.dtstack.flink.sql.util.PluginUtil;
4951
import org.apache.calcite.sql.SqlInsert;
5052
import org.apache.calcite.sql.SqlNode;
53+
import org.apache.commons.collections.MapUtils;
5154
import org.apache.commons.io.Charsets;
5255
import org.apache.commons.lang3.StringUtils;
5356
import org.apache.flink.api.common.ExecutionConfig;
@@ -74,6 +77,7 @@
7477
import org.slf4j.Logger;
7578
import org.slf4j.LoggerFactory;
7679
import java.io.File;
80+
import java.io.IOException;
7781
import java.lang.reflect.Field;
7882
import java.lang.reflect.InvocationTargetException;
7983
import java.lang.reflect.Method;
@@ -82,6 +86,7 @@
8286
import java.net.URLDecoder;
8387
import java.util.List;
8488
import java.util.Map;
89+
import java.util.Optional;
8590
import java.util.Properties;
8691
import java.util.Set;
8792
import java.util.concurrent.TimeUnit;
@@ -114,6 +119,7 @@ public static void main(String[] args) throws Exception {
114119
String pluginLoadMode = options.getPluginLoadMode();
115120
String deployMode = options.getMode();
116121
String confProp = options.getConfProp();
122+
String dirtyProp = options.getDirtyProp();
117123

118124
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
119125
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
@@ -123,13 +129,18 @@ public static void main(String[] args) throws Exception {
123129
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
124130
addJarFileList = objMapper.readValue(addJarListStr, List.class);
125131
}
126-
127132
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
128133
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
134+
135+
dirtyProp = URLDecoder.decode(dirtyProp, Charsets.UTF_8.toString());
136+
// set DirtyDataManager dirtyconfig
137+
DirtyConfig dirtyConfig = getDirtyDataManagerDirtyConfig(dirtyProp);
138+
129139
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
130140
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
131141
StreamQueryConfig queryConfig = getStreamTableEnvTTL(confProperties, tableEnv);
132142

143+
133144
List<URL> jarURList = Lists.newArrayList();
134145
SqlTree sqlTree = SqlParser.parseSql(sql);
135146

@@ -145,7 +156,7 @@ public static void main(String[] args) throws Exception {
145156
//register udf
146157
registerUDF(sqlTree, jarURList, tableEnv);
147158
//register table schema
148-
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
159+
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache, dirtyConfig);
149160

150161
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache, queryConfig);
151162

@@ -156,6 +167,11 @@ public static void main(String[] args) throws Exception {
156167
env.execute(name);
157168
}
158169

170+
private static DirtyConfig getDirtyDataManagerDirtyConfig(String dirtyProp) throws IOException {
171+
Map dirtyCofig = PluginUtil.jsonStrToObject(dirtyProp, Map.class);
172+
return dirtyCofig.size() == 0 ? null : new DirtyConfig(dirtyCofig);
173+
}
174+
159175
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
160176
SideSqlExec sideSqlExec = new SideSqlExec();
161177
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
@@ -235,14 +251,16 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTabl
235251

236252

237253
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
238-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
254+
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap,
255+
Map<String, Table> registerTableCache, DirtyConfig dirtyConfig) throws Exception {
239256
Set<URL> classPathSet = Sets.newHashSet();
240257
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
241258
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
242259

243260
if (tableInfo instanceof SourceTableInfo) {
244261

245262
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
263+
sourceTableInfo.setDirtyConfig(dirtyConfig);
246264
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
247265
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
248266
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
@@ -272,7 +290,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
272290
registerTableCache.put(tableInfo.getName(), regTable);
273291
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
274292
} else if (tableInfo instanceof TargetTableInfo) {
275-
293+
tableInfo.setDirtyConfig(dirtyConfig);
276294
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
277295
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
278296
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.authenticate;
21+
22+
import org.apache.commons.collections.MapUtils;
23+
import org.apache.commons.lang.StringUtils;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.security.UserGroupInformation;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import sun.security.krb5.Config;
29+
import sun.security.krb5.internal.ktab.KeyTab;
30+
import sun.security.krb5.internal.ktab.KeyTabEntry;
31+
32+
import java.io.File;
33+
import java.io.IOException;
34+
import java.net.InetAddress;
35+
import java.util.Locale;
36+
import java.util.Map;
37+
38+
/**
39+
* @author jiangbo
40+
* @date 2019/8/20
41+
*/
42+
public class KerberosUtil {
43+
44+
public static Logger LOG = LoggerFactory.getLogger(KerberosUtil.class);
45+
46+
private static final String PRINCIPAL_SPLIT_REGEX = "/";
47+
private static final String SP = File.separator;
48+
49+
private static final String KEY_SFTP_CONF = "sftpConf";
50+
private static final String KEY_REMOTE_DIR = "remoteDir";
51+
private static final String KEY_USE_LOCAL_FILE = "useLocalFile";
52+
private static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
53+
54+
private static String LOCAL_DIR;
55+
56+
static {
57+
String systemInfo = System.getProperty("os.name");
58+
if(systemInfo.toLowerCase().startsWith("windows")){
59+
LOCAL_DIR = System.getProperty("user.dir");
60+
} else {
61+
LOCAL_DIR = "/tmp/flinksql/keytab";
62+
}
63+
}
64+
65+
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException {
66+
if (conf == null) {
67+
throw new IllegalArgumentException("kerberos conf can not be null");
68+
}
69+
70+
if (StringUtils.isEmpty(principal)) {
71+
throw new IllegalArgumentException("principal can not be null");
72+
}
73+
74+
if(StringUtils.isEmpty(keytab)){
75+
throw new IllegalArgumentException("keytab can not be null");
76+
}
77+
78+
if(StringUtils.isNotEmpty(conf.get(KEY_JAVA_SECURITY_KRB5_CONF))){
79+
reloadKrb5Conf(conf);
80+
}
81+
82+
conf.set("hadoop.security.authentication", "Kerberos");
83+
UserGroupInformation.setConfiguration(conf);
84+
85+
LOG.info("login user:{} with keytab:{}", principal, keytab);
86+
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
87+
}
88+
89+
private static void reloadKrb5Conf(Configuration conf){
90+
String krb5File = conf.get(KEY_JAVA_SECURITY_KRB5_CONF);
91+
LOG.info("set krb5 file:{}", krb5File);
92+
System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5File);
93+
94+
try {
95+
if (!System.getProperty("java.vendor").contains("IBM")) {
96+
Config.refresh();
97+
}
98+
} catch (Exception e){
99+
LOG.warn("reload krb5 file:{} error:", krb5File, e);
100+
}
101+
}
102+
103+
public static void loadKrb5Conf(Map<String, Object> kerberosConfig, String jobId, String plugin){
104+
String krb5FilePath = MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF);
105+
if(StringUtils.isEmpty(krb5FilePath)){
106+
LOG.info("krb5 file is empty,will use default file");
107+
return;
108+
}
109+
110+
krb5FilePath = loadFile(kerberosConfig, krb5FilePath, jobId, plugin);
111+
kerberosConfig.put(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);
112+
}
113+
114+
public static String loadFile(Map<String, Object> kerberosConfig, String filePath, String jobId, String plugin) {
115+
boolean useLocalFile = MapUtils.getBooleanValue(kerberosConfig, KEY_USE_LOCAL_FILE);
116+
if(useLocalFile){
117+
LOG.info("will use local file:{}", filePath);
118+
checkFileExists(filePath);
119+
} else {
120+
if(filePath.contains(SP)){
121+
filePath = filePath.substring(filePath.lastIndexOf(SP) + 1);
122+
}
123+
124+
filePath = loadFromSFTP(kerberosConfig, filePath, jobId, plugin);
125+
}
126+
127+
return filePath;
128+
}
129+
130+
private static void checkFileExists(String keytab){
131+
File file = new File(keytab);
132+
if (file.exists()){
133+
if (file.isDirectory()) {
134+
throw new RuntimeException("keytab is a directory:" + keytab);
135+
}
136+
} else {
137+
throw new RuntimeException("keytab file not exists:" + keytab);
138+
}
139+
}
140+
141+
private static String loadFromSFTP(Map<String, Object> config, String keytab, String jobId, String plugin){
142+
String localDir = createLocalDir(jobId, plugin);
143+
String localPath = localDir + SP + keytab;
144+
145+
SFTPHandler handler = null;
146+
try {
147+
handler = SFTPHandler.getInstance(MapUtils.getMap(config, KEY_SFTP_CONF));
148+
149+
String remoteDir = MapUtils.getString(config, KEY_REMOTE_DIR);
150+
String filePathOnSFTP = remoteDir + "/" + keytab;
151+
if(handler.isFileExist(filePathOnSFTP)){
152+
handler.downloadFile(filePathOnSFTP, localPath);
153+
154+
LOG.info("download file:{} to local:{}", filePathOnSFTP, localDir);
155+
return localPath;
156+
} else {
157+
String hostname = InetAddress.getLocalHost().getCanonicalHostName().toLowerCase(Locale.US);
158+
filePathOnSFTP = remoteDir + "/" + hostname + "/" + keytab;
159+
handler.downloadFile(filePathOnSFTP, localPath);
160+
161+
LOG.info("download file:{} to local:{}", filePathOnSFTP, localDir);
162+
return localPath;
163+
}
164+
} catch (Exception e){
165+
throw new RuntimeException(e);
166+
} finally {
167+
if (handler != null){
168+
handler.close();
169+
}
170+
}
171+
}
172+
173+
public static String findPrincipalFromKeytab(String principal, String keytabFile) {
174+
String serverName = principal.split(PRINCIPAL_SPLIT_REGEX)[0];
175+
176+
KeyTab keyTab = KeyTab.getInstance(keytabFile);
177+
for (KeyTabEntry entry : keyTab.getEntries()) {
178+
String princ = entry.getService().getName();
179+
if(princ.startsWith(serverName)){
180+
LOG.info("parse principal:{} from keytab:{}", princ, keytabFile);
181+
return princ;
182+
}
183+
}
184+
185+
return principal;
186+
}
187+
188+
public static void clear(String jobId){
189+
File file = new File(LOCAL_DIR + SP + jobId);
190+
if (file.exists()){
191+
boolean result = file.delete();
192+
if (!result){
193+
LOG.warn("Delete file failure:[{}]", LOCAL_DIR + SP + jobId);
194+
}
195+
}
196+
}
197+
198+
private static String createLocalDir(String jobId, String plugin){
199+
String path = LOCAL_DIR + SP + jobId + SP + plugin;
200+
File file = new File(path);
201+
if (file.exists()){
202+
return path;
203+
}
204+
205+
boolean result = file.mkdirs();
206+
if (!result){
207+
LOG.warn("Create dir failure:{}", path);
208+
}
209+
210+
LOG.info("create local dir:{}", path);
211+
return path;
212+
}
213+
}

0 commit comments

Comments
 (0)