2222
2323import com .dtstack .flink .sql .config .CalciteConfig ;
2424import com .dtstack .flink .sql .classloader .ClassLoaderManager ;
25- import com .dtstack .flink .sql .config .DirtyConfig ;
2625import com .dtstack .flink .sql .constrant .ConfigConstrant ;
27- import com .dtstack .flink .sql .dirty .DirtyDataManager ;
2826import com .dtstack .flink .sql .enums .ClusterMode ;
2927import com .dtstack .flink .sql .enums .ECacheType ;
3028import com .dtstack .flink .sql .enums .EPluginLoadMode ;
5048import com .dtstack .flink .sql .util .PluginUtil ;
5149import org .apache .calcite .sql .SqlInsert ;
5250import org .apache .calcite .sql .SqlNode ;
53- import org .apache .commons .collections .MapUtils ;
5451import org .apache .commons .io .Charsets ;
5552import org .apache .commons .lang3 .StringUtils ;
5653import org .apache .flink .api .common .ExecutionConfig ;
7774import org .slf4j .Logger ;
7875import org .slf4j .LoggerFactory ;
7976import java .io .File ;
80- import java .io .IOException ;
8177import java .lang .reflect .Field ;
8278import java .lang .reflect .InvocationTargetException ;
8379import java .lang .reflect .Method ;
8682import java .net .URLDecoder ;
8783import java .util .List ;
8884import java .util .Map ;
89- import java .util .Optional ;
9085import java .util .Properties ;
9186import java .util .Set ;
9287import java .util .concurrent .TimeUnit ;
@@ -119,7 +114,6 @@ public static void main(String[] args) throws Exception {
119114 String pluginLoadMode = options .getPluginLoadMode ();
120115 String deployMode = options .getMode ();
121116 String confProp = options .getConfProp ();
122- String dirtyProp = options .getDirtyProp ();
123117
124118 sql = URLDecoder .decode (sql , Charsets .UTF_8 .name ());
125119 SqlParser .setLocalSqlPluginRoot (localSqlPluginPath );
@@ -129,18 +123,13 @@ public static void main(String[] args) throws Exception {
129123 addJarListStr = URLDecoder .decode (addJarListStr , Charsets .UTF_8 .name ());
130124 addJarFileList = objMapper .readValue (addJarListStr , List .class );
131125 }
126+
132127 confProp = URLDecoder .decode (confProp , Charsets .UTF_8 .toString ());
133128 Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
134-
135- dirtyProp = URLDecoder .decode (dirtyProp , Charsets .UTF_8 .toString ());
136- // set DirtyDataManager dirtyconfig
137- DirtyConfig dirtyConfig = getDirtyDataManagerDirtyConfig (dirtyProp );
138-
139129 StreamExecutionEnvironment env = getStreamExeEnv (confProperties , deployMode );
140130 StreamTableEnvironment tableEnv = StreamTableEnvironment .getTableEnvironment (env );
141131 StreamQueryConfig queryConfig = getStreamTableEnvTTL (confProperties , tableEnv );
142132
143-
144133 List <URL > jarURList = Lists .newArrayList ();
145134 SqlTree sqlTree = SqlParser .parseSql (sql );
146135
@@ -156,7 +145,7 @@ public static void main(String[] args) throws Exception {
156145 //register udf
157146 registerUDF (sqlTree , jarURList , tableEnv );
158147 //register table schema
159- registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , sideTableMap , registerTableCache , dirtyConfig );
148+ registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , sideTableMap , registerTableCache );
160149
161150 sqlTranslation (localSqlPluginPath , tableEnv ,sqlTree ,sideTableMap ,registerTableCache , queryConfig );
162151
@@ -167,11 +156,6 @@ public static void main(String[] args) throws Exception {
167156 env .execute (name );
168157 }
169158
170- private static DirtyConfig getDirtyDataManagerDirtyConfig (String dirtyProp ) throws IOException {
171- Map dirtyCofig = PluginUtil .jsonStrToObject (dirtyProp , Map .class );
172- return dirtyCofig .size () == 0 ? null : new DirtyConfig (dirtyCofig );
173- }
174-
175159 private static void sqlTranslation (String localSqlPluginPath , StreamTableEnvironment tableEnv ,SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,Map <String , Table > registerTableCache , StreamQueryConfig queryConfig ) throws Exception {
176160 SideSqlExec sideSqlExec = new SideSqlExec ();
177161 sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
@@ -251,16 +235,14 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTabl
251235
252236
253237 private static void registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
254- String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap ,
255- Map <String , Table > registerTableCache , DirtyConfig dirtyConfig ) throws Exception {
238+ String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
256239 Set <URL > classPathSet = Sets .newHashSet ();
257240 WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
258241 for (TableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
259242
260243 if (tableInfo instanceof SourceTableInfo ) {
261244
262245 SourceTableInfo sourceTableInfo = (SourceTableInfo ) tableInfo ;
263- sourceTableInfo .setDirtyConfig (dirtyConfig );
264246 Table table = StreamSourceFactory .getStreamSource (sourceTableInfo , env , tableEnv , localSqlPluginPath );
265247 tableEnv .registerTable (sourceTableInfo .getAdaptName (), table );
266248 //Note --- parameter conversion function can not be used inside a function of the type of polymerization
@@ -290,7 +272,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
290272 registerTableCache .put (tableInfo .getName (), regTable );
291273 classPathSet .add (buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode ));
292274 } else if (tableInfo instanceof TargetTableInfo ) {
293- tableInfo . setDirtyConfig ( dirtyConfig );
275+
294276 TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
295277 TypeInformation [] flinkTypes = FlinkUtil .transformTypes (tableInfo .getFieldClasses ());
296278 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
0 commit comments