@@ -137,7 +137,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
137137 * @param pluginLoadMode
138138 * @return
139139 */
140- public static boolean checkRemoteSqlPluginPath (String remoteSqlPluginPath , String deployMode , String pluginLoadMode ) {
140+ private static boolean checkRemoteSqlPluginPath (String remoteSqlPluginPath , String deployMode , String pluginLoadMode ) {
141141 if (StringUtils .isEmpty (remoteSqlPluginPath )) {
142142 return StringUtils .equalsIgnoreCase (pluginLoadMode , EPluginLoadMode .SHIPFILE .name ())
143143 || StringUtils .equalsIgnoreCase (deployMode , ClusterMode .local .name ());
@@ -174,7 +174,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
174174 }
175175
176176
177- public static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
177+ private static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
178178 List <URL > jarUrlList = Lists .newArrayList ();
179179 if (Strings .isNullOrEmpty (addJarListStr )) {
180180 return jarUrlList ;
@@ -187,7 +187,7 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
187187 }
188188 return jarUrlList ;
189189 }
190-
190+
191191 private static void sqlTranslation (String localSqlPluginPath ,
192192 StreamTableEnvironment tableEnv ,
193193 SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,
@@ -238,7 +238,7 @@ private static void sqlTranslation(String localSqlPluginPath,
238238 }
239239 }
240240
241- public static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
241+ private static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
242242 throws IllegalAccessException , InvocationTargetException {
243243 // udf和tableEnv须由同一个类加载器加载
244244 ClassLoader levelClassLoader = tableEnv .getClass ().getClassLoader ();
@@ -266,9 +266,9 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
266266 * @return
267267 * @throws Exception
268268 */
269- public static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
269+ private static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
270270 String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
271- Set <URL > pluginClassPatshSets = Sets .newHashSet ();
271+ Set <URL > pluginClassPathSets = Sets .newHashSet ();
272272 WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
273273 for (TableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
274274
@@ -306,34 +306,34 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
306306 registerTableCache .put (tableInfo .getName (), regTable );
307307
308308 URL sourceTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
309- pluginClassPatshSets .add (sourceTablePathUrl );
309+ pluginClassPathSets .add (sourceTablePathUrl );
310310 } else if (tableInfo instanceof TargetTableInfo ) {
311311
312312 TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
313313 TypeInformation [] flinkTypes = FunctionManager .transformTypes (tableInfo .getFieldClasses ());
314314 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
315315
316316 URL sinkTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
317- pluginClassPatshSets .add (sinkTablePathUrl );
317+ pluginClassPathSets .add (sinkTablePathUrl );
318318 } else if (tableInfo instanceof SideTableInfo ) {
319319 String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
320320 sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
321321
322322 URL sideTablePathUrl = PluginUtil .buildSidePathByLoadMode (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
323- pluginClassPatshSets .add (sideTablePathUrl );
323+ pluginClassPathSets .add (sideTablePathUrl );
324324 } else {
325325 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
326326 }
327327 }
328- return pluginClassPatshSets ;
328+ return pluginClassPathSets ;
329329 }
330330
331331 /**
332332 * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
333333 * @param env
334334 * @param classPathSet
335335 */
336- public static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
336+ private static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
337337 int i = 0 ;
338338 for (URL url : classPathSet ) {
339339 String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
@@ -342,7 +342,7 @@ public static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env,
342342 }
343343 }
344344
345- public static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
345+ private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
346346 StreamExecutionEnvironment env = !ClusterMode .local .name ().equals (deployMode ) ?
347347 StreamExecutionEnvironment .getExecutionEnvironment () :
348348 new MyLocalStreamEnvironment ();
0 commit comments