1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+
19+ package com .dtstack .flink .sql .exec ;
20+
21+ import com .dtstack .flink .sql .classloader .ClassLoaderManager ;
22+ import com .dtstack .flink .sql .config .CalciteConfig ;
23+ import com .dtstack .flink .sql .enums .ClusterMode ;
24+ import com .dtstack .flink .sql .enums .ECacheType ;
25+ import com .dtstack .flink .sql .environment .MyLocalStreamEnvironment ;
26+ import com .dtstack .flink .sql .environment .StreamEnvConfigManager ;
27+ import com .dtstack .flink .sql .function .FunctionManager ;
28+ import com .dtstack .flink .sql .option .OptionParser ;
29+ import com .dtstack .flink .sql .option .Options ;
30+ import com .dtstack .flink .sql .parser .CreateFuncParser ;
31+ import com .dtstack .flink .sql .parser .CreateTmpTableParser ;
32+ import com .dtstack .flink .sql .parser .InsertSqlParser ;
33+ import com .dtstack .flink .sql .parser .SqlParser ;
34+ import com .dtstack .flink .sql .parser .SqlTree ;
35+ import com .dtstack .flink .sql .side .SideSqlExec ;
36+ import com .dtstack .flink .sql .side .SideTableInfo ;
37+ import com .dtstack .flink .sql .sink .StreamSinkFactory ;
38+ import com .dtstack .flink .sql .source .StreamSourceFactory ;
39+ import com .dtstack .flink .sql .table .SourceTableInfo ;
40+ import com .dtstack .flink .sql .table .TableInfo ;
41+ import com .dtstack .flink .sql .table .TargetTableInfo ;
42+ import com .dtstack .flink .sql .util .DtStringUtil ;
43+ import com .dtstack .flink .sql .util .PluginUtil ;
44+ import com .dtstack .flink .sql .watermarker .WaterMarkerAssigner ;
45+ import com .fasterxml .jackson .databind .ObjectMapper ;
46+ import com .google .common .base .Strings ;
47+ import com .google .common .collect .Lists ;
48+ import com .google .common .collect .Maps ;
49+ import com .google .common .collect .Sets ;
50+ import org .apache .calcite .sql .SqlInsert ;
51+ import org .apache .calcite .sql .SqlNode ;
52+ import org .apache .commons .io .Charsets ;
53+ import org .apache .flink .api .common .typeinfo .TypeInformation ;
54+ import org .apache .flink .api .java .tuple .Tuple2 ;
55+ import org .apache .flink .api .java .typeutils .RowTypeInfo ;
56+ import org .apache .flink .streaming .api .datastream .DataStream ;
57+ import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
58+ import org .apache .flink .table .api .StreamQueryConfig ;
59+ import org .apache .flink .table .api .Table ;
60+ import org .apache .flink .table .api .TableEnvironment ;
61+ import org .apache .flink .table .api .java .StreamTableEnvironment ;
62+ import org .apache .flink .table .sinks .TableSink ;
63+ import org .apache .flink .types .Row ;
64+ import org .slf4j .Logger ;
65+ import org .slf4j .LoggerFactory ;
66+
67+ import java .io .File ;
68+ import java .lang .reflect .InvocationTargetException ;
69+ import java .net .URL ;
70+ import java .net .URLClassLoader ;
71+ import java .net .URLDecoder ;
72+ import java .util .Arrays ;
73+ import java .util .List ;
74+ import java .util .Map ;
75+ import java .util .Properties ;
76+ import java .util .Set ;
77+
78+ /**
79+ * 提取任务执行时共同的流程方法
80+ * Date: 2020/2/17
81+ * Company: www.dtstack.com
82+ * @author maqi
83+ */
84+ public class BuildProcess {
85+
86+ private static final String CLASS_FILE_NAME_FMT = "class_path_%d" ;
87+ private static final Logger LOG = LoggerFactory .getLogger (BuildProcess .class );
88+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
89+
90+
91+ public static ParamsInfo parseParams (String [] args ) throws Exception {
92+ LOG .info ("------------program params-------------------------" );
93+ System .out .println ("------------program params-------------------------" );
94+ Arrays .stream (args ).forEach (arg -> LOG .info ("{}" , arg ));
95+ Arrays .stream (args ).forEach (System .out ::println );
96+ LOG .info ("-------------------------------------------" );
97+ System .out .println ("----------------------------------------" );
98+
99+ OptionParser optionParser = new OptionParser (args );
100+ Options options = optionParser .getOptions ();
101+
102+ String sql = URLDecoder .decode (options .getSql (), Charsets .UTF_8 .name ());
103+ String name = options .getName ();
104+ String localSqlPluginPath = options .getLocalSqlPluginPath ();
105+ String remoteSqlPluginPath = options .getRemoteSqlPluginPath ();
106+ String pluginLoadMode = options .getPluginLoadMode ();
107+ String deployMode = options .getMode ();
108+
109+ String confProp = URLDecoder .decode (options .getConfProp (), Charsets .UTF_8 .toString ());
110+ Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
111+
112+ List <URL > jarURList = getExternalJarUrls (options .getAddjar ());
113+
114+ return ParamsInfo .builder ()
115+ .setSql (sql )
116+ .setName (name )
117+ .setLocalSqlPluginPath (localSqlPluginPath )
118+ .setRemoteSqlPluginPath (remoteSqlPluginPath )
119+ .setPluginLoadMode (pluginLoadMode )
120+ .setDeployMode (deployMode )
121+ .setConfProp (confProperties )
122+ .setJarUrlList (jarURList )
123+ .build ();
124+
125+ }
126+
127+ public static StreamExecutionEnvironment getStreamExecution (ParamsInfo paramsInfo ) throws Exception {
128+ StreamExecutionEnvironment env = BuildProcess .getStreamExeEnv (paramsInfo .getConfProp (), paramsInfo .getDeployMode ());
129+ StreamTableEnvironment tableEnv = StreamTableEnvironment .create (env );
130+ StreamQueryConfig streamQueryConfig = StreamEnvConfigManager .getStreamQueryConfig (tableEnv , paramsInfo .getConfProp ());
131+
132+ SqlParser .setLocalSqlPluginRoot (paramsInfo .getLocalSqlPluginPath ());
133+ SqlTree sqlTree = SqlParser .parseSql (paramsInfo .getSql ());
134+
135+ Map <String , SideTableInfo > sideTableMap = Maps .newHashMap ();
136+ Map <String , Table > registerTableCache = Maps .newHashMap ();
137+
138+ //register udf
139+ BuildProcess .registerUserDefinedFunction (sqlTree , paramsInfo .getJarUrlList (), tableEnv );
140+ //register table schema
141+ Set <URL > classPathSets = BuildProcess .registerTable (sqlTree , env , tableEnv , paramsInfo .getLocalSqlPluginPath (),
142+ paramsInfo .getRemoteSqlPluginPath (), paramsInfo .getPluginLoadMode (), sideTableMap , registerTableCache );
143+ // cache classPathSets
144+ BuildProcess .registerPluginUrlToCachedFile (env , classPathSets );
145+
146+ BuildProcess .sqlTranslation (paramsInfo .getLocalSqlPluginPath (), tableEnv , sqlTree , sideTableMap , registerTableCache , streamQueryConfig );
147+
148+ if (env instanceof MyLocalStreamEnvironment ) {
149+ ((MyLocalStreamEnvironment ) env ).setClasspaths (ClassLoaderManager .getClassPath ());
150+ }
151+ return env ;
152+ }
153+
154+
155+ public static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
156+ List <URL > jarUrlList = Lists .newArrayList ();
157+ if (Strings .isNullOrEmpty (addJarListStr )) {
158+ return jarUrlList ;
159+ }
160+
161+ List <String > addJarFileList = OBJECT_MAPPER .readValue (URLDecoder .decode (addJarListStr , Charsets .UTF_8 .name ()), List .class );
162+ //Get External jar to load
163+ for (String addJarPath : addJarFileList ) {
164+ jarUrlList .add (new File (addJarPath ).toURI ().toURL ());
165+ }
166+ return jarUrlList ;
167+ }
168+
169+ public static void sqlTranslation (String localSqlPluginPath , StreamTableEnvironment tableEnv , SqlTree sqlTree , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache , StreamQueryConfig queryConfig ) throws Exception {
170+ SideSqlExec sideSqlExec = new SideSqlExec ();
171+ sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
172+ for (CreateTmpTableParser .SqlParserResult result : sqlTree .getTmpSqlList ()) {
173+ sideSqlExec .registerTmpTable (result , sideTableMap , tableEnv , registerTableCache );
174+ }
175+
176+ for (InsertSqlParser .SqlParseResult result : sqlTree .getExecSqlList ()) {
177+ if (LOG .isInfoEnabled ()) {
178+ LOG .info ("exe-sql:\n " + result .getExecSql ());
179+ }
180+ boolean isSide = false ;
181+ for (String tableName : result .getTargetTableList ()) {
182+ if (sqlTree .getTmpTableMap ().containsKey (tableName )) {
183+ CreateTmpTableParser .SqlParserResult tmp = sqlTree .getTmpTableMap ().get (tableName );
184+ String realSql = DtStringUtil .replaceIgnoreQuota (result .getExecSql (), "`" , "" );
185+
186+ SqlNode sqlNode = org .apache .calcite .sql .parser .SqlParser .create (realSql , CalciteConfig .MYSQL_LEX_CONFIG ).parseStmt ();
187+ String tmpSql = ((SqlInsert ) sqlNode ).getSource ().toString ();
188+ tmp .setExecSql (tmpSql );
189+ sideSqlExec .registerTmpTable (tmp , sideTableMap , tableEnv , registerTableCache );
190+ } else {
191+ for (String sourceTable : result .getSourceTableList ()) {
192+ if (sideTableMap .containsKey (sourceTable )) {
193+ isSide = true ;
194+ break ;
195+ }
196+ }
197+ if (isSide ) {
198+ //sql-dimensional table contains the dimension table of execution
199+ sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache , queryConfig );
200+ } else {
201+ FlinkSQLExec .sqlUpdate (tableEnv , result .getExecSql (), queryConfig );
202+ if (LOG .isInfoEnabled ()) {
203+ LOG .info ("exec sql: " + result .getExecSql ());
204+ }
205+ }
206+ }
207+ }
208+ }
209+ }
210+
211+ public static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
212+ throws IllegalAccessException , InvocationTargetException {
213+ // udf和tableEnv须由同一个类加载器加载
214+ ClassLoader levelClassLoader = tableEnv .getClass ().getClassLoader ();
215+ URLClassLoader classLoader = null ;
216+ List <CreateFuncParser .SqlParserResult > funcList = sqlTree .getFunctionList ();
217+ for (CreateFuncParser .SqlParserResult funcInfo : funcList ) {
218+ //classloader
219+ if (classLoader == null ) {
220+ classLoader = ClassLoaderManager .loadExtraJar (jarUrlList , (URLClassLoader ) levelClassLoader );
221+ }
222+ FunctionManager .registerUDF (funcInfo .getType (), funcInfo .getClassName (), funcInfo .getName (), tableEnv , classLoader );
223+ }
224+ }
225+
226+ /**
227+ * 向Flink注册源表和结果表,返回执行时插件包的全路径
228+ * @param sqlTree
229+ * @param env
230+ * @param tableEnv
231+ * @param localSqlPluginPath
232+ * @param remoteSqlPluginPath
233+ * @param pluginLoadMode 插件加载模式 classpath or shipfile
234+ * @param sideTableMap
235+ * @param registerTableCache
236+ * @return
237+ * @throws Exception
238+ */
239+ public static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
240+ String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
241+ Set <URL > pluginClassPatshSets = Sets .newHashSet ();
242+ WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
243+ for (TableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
244+
245+ if (tableInfo instanceof SourceTableInfo ) {
246+
247+ SourceTableInfo sourceTableInfo = (SourceTableInfo ) tableInfo ;
248+ Table table = StreamSourceFactory .getStreamSource (sourceTableInfo , env , tableEnv , localSqlPluginPath );
249+ tableEnv .registerTable (sourceTableInfo .getAdaptName (), table );
250+ //Note --- parameter conversion function can not be used inside a function of the type of polymerization
251+ //Create table in which the function is arranged only need adaptation sql
252+ String adaptSql = sourceTableInfo .getAdaptSelectSql ();
253+ Table adaptTable = adaptSql == null ? table : tableEnv .sqlQuery (adaptSql );
254+
255+ RowTypeInfo typeInfo = new RowTypeInfo (adaptTable .getSchema ().getFieldTypes (), adaptTable .getSchema ().getFieldNames ());
256+ DataStream adaptStream = tableEnv .toRetractStream (adaptTable , typeInfo )
257+ .map ((Tuple2 <Boolean , Row > f0 ) -> {
258+ return f0 .f1 ;
259+ })
260+ .returns (typeInfo );
261+
262+ String fields = String .join ("," , typeInfo .getFieldNames ());
263+
264+ if (waterMarkerAssigner .checkNeedAssignWaterMarker (sourceTableInfo )) {
265+ adaptStream = waterMarkerAssigner .assignWaterMarker (adaptStream , typeInfo , sourceTableInfo );
266+ fields += ",ROWTIME.ROWTIME" ;
267+ } else {
268+ fields += ",PROCTIME.PROCTIME" ;
269+ }
270+
271+ Table regTable = tableEnv .fromDataStream (adaptStream , fields );
272+ tableEnv .registerTable (tableInfo .getName (), regTable );
273+ if (LOG .isInfoEnabled ()) {
274+ LOG .info ("registe table {} success." , tableInfo .getName ());
275+ }
276+ registerTableCache .put (tableInfo .getName (), regTable );
277+
278+ URL sourceTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
279+ pluginClassPatshSets .add (sourceTablePathUrl );
280+ } else if (tableInfo instanceof TargetTableInfo ) {
281+
282+ TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
283+ TypeInformation [] flinkTypes = FunctionManager .transformTypes (tableInfo .getFieldClasses ());
284+ tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
285+
286+ URL sinkTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
287+ pluginClassPatshSets .add (sinkTablePathUrl );
288+ } else if (tableInfo instanceof SideTableInfo ) {
289+ String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
290+ sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
291+
292+ URL sideTablePathUrl = PluginUtil .buildSidePathByLoadMode (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
293+ pluginClassPatshSets .add (sideTablePathUrl );
294+ } else {
295+ throw new RuntimeException ("not support table type:" + tableInfo .getType ());
296+ }
297+ }
298+ return pluginClassPatshSets ;
299+ }
300+
301+ /**
302+ * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
303+ * @param env
304+ * @param classPathSet
305+ */
306+ public static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
307+ int i = 0 ;
308+ for (URL url : classPathSet ) {
309+ String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
310+ env .registerCachedFile (url .getPath (), classFileName , true );
311+ i ++;
312+ }
313+ }
314+
315+ public static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
316+ StreamExecutionEnvironment env = !ClusterMode .local .name ().equals (deployMode ) ?
317+ StreamExecutionEnvironment .getExecutionEnvironment () :
318+ new MyLocalStreamEnvironment ();
319+
320+ StreamEnvConfigManager .streamExecutionEnvironmentConfig (env , confProperties );
321+ return env ;
322+ }
323+ }
0 commit comments