2323import org .apache .flink .table .api .TableSchema ;
2424import org .apache .flink .table .api .java .StreamTableEnvironment ;
2525import org .apache .flink .table .sinks .TableSink ;
26+ import org .junit .Before ;
2627import org .junit .Test ;
2728import org .junit .runner .RunWith ;
2829import org .powermock .api .mockito .PowerMockito ;
3132
3233import java .io .IOException ;
3334import java .net .URL ;
35+ import java .util .HashMap ;
3436import java .util .Map ;
3537import java .util .Properties ;
3638import java .util .Set ;
4446@ RunWith (PowerMockRunner .class )
4547@ PrepareForTest ({SqlParser .class , PluginUtil .class , StreamSourceFactory .class , StreamSinkFactory .class })
4648public class ExecuteProcessHelperTest {
49+
50+ private Map <String , Object > dirtyMap ;
51+
52+ @ Before
53+ public void setUp () {
54+ dirtyMap = new HashMap <>();
55+ dirtyMap .put ("type" , "console" );
56+ // 多少条数据打印一次
57+ dirtyMap .put ("printLimit" , "100" );
58+ dirtyMap .put ("url" , "jdbc:mysql://localhost:3306/tiezhu" );
59+ dirtyMap .put ("userName" , "root" );
60+ dirtyMap .put ("password" , "abc123" );
61+ dirtyMap .put ("isCreateTable" , "false" );
62+ // 多少条数据写入一次
63+ dirtyMap .put ("batchSize" , "1" );
64+ dirtyMap .put ("tableName" , "dirtyData" );
65+ }
4766
4867 @ Test
4968 public void parseParams () throws Exception {
5069 String [] sql = new String []{"-mode" , "yarnPer" , "-sql" , "/Users/maqi/tmp/json/group_tmp4.txt" , "-name" , "PluginLoadModeTest" ,
51- "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
52- "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
53- "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
54- "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
55- "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
70+ "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
71+ "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
72+ "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
73+ "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
74+ "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
5675
5776 ExecuteProcessHelper .parseParams (sql );
5877 }
5978
6079 @ Test
61- public void checkRemoteSqlPluginPath (){
62- ExecuteProcessHelper .checkRemoteSqlPluginPath (null , EPluginLoadMode .SHIPFILE .name (), ClusterMode .local .name ());
80+ public void checkRemoteSqlPluginPath () {
81+ ExecuteProcessHelper .checkRemoteSqlPluginPath (null , EPluginLoadMode .SHIPFILE .name (), ClusterMode .local .name ());
6382
6483 }
6584
6685 // @Test
6786 public void getStreamExecution () throws Exception {
6887 String [] sql = new String []{"-mode" , "yarnPer" , "-sql" , "/Users/maqi/tmp/json/group_tmp4.txt" , "-name" , "PluginLoadModeTest" ,
69- "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
70- "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
71- "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
72- "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
73- "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
88+ "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
89+ "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
90+ "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
91+ "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
92+ "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
7493 ParamsInfo paramsInfo = ExecuteProcessHelper .parseParams (sql );
7594 PowerMockito .mockStatic (SqlParser .class );
7695 SqlTree sqlTree = mock (SqlTree .class );
@@ -113,7 +132,7 @@ public void registerTable() throws Exception {
113132 PowerMockito .mockStatic (PluginUtil .class );
114133
115134 PowerMockito .mockStatic (StreamSourceFactory .class );
116- when (StreamSourceFactory .getStreamSource (anyObject (), anyObject (), anyObject (), anyString (),anyString ())).thenReturn (table );
135+ when (StreamSourceFactory .getStreamSource (anyObject (), anyObject (), anyObject (), anyString (), anyString ())).thenReturn (table );
117136
118137 TableSink tableSink = mock (TableSink .class );
119138 PowerMockito .mockStatic (StreamSinkFactory .class );
@@ -133,7 +152,7 @@ public void registerTable() throws Exception {
133152 when (sideTableInfo .getCacheType ()).thenReturn ("all" );
134153 when (sideTableInfo .getName ()).thenReturn ("sideTable" );
135154 when (sideTableInfo .getType ()).thenReturn ("redis" );
136- when (PluginUtil .buildSidePathByLoadMode (anyString (), anyString (), anyString (), anyString (), anyString (),anyString ())).thenReturn (new URL ("file://a" ));
155+ when (PluginUtil .buildSidePathByLoadMode (anyString (), anyString (), anyString (), anyString (), anyString (), anyString ())).thenReturn (new URL ("file://a" ));
137156
138157 AbstractTargetTableInfo targetTableInfo = mock (AbstractTargetTableInfo .class );
139158 when (targetTableInfo .getName ()).thenReturn ("sinkTable" );
@@ -147,12 +166,21 @@ public void registerTable() throws Exception {
147166 tableMap .put ("target" , targetTableInfo );
148167 when (sqlTree .getTableInfoMap ()).thenReturn (tableMap );
149168
150- ExecuteProcessHelper .registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , sideTableMap , registerTableCache );
169+ // SqlTree sqlTree
170+ // , StreamExecutionEnvironment env
171+ // , StreamTableEnvironment tableEnv
172+ // , String localSqlPluginPath
173+ // , String remoteSqlPluginPath
174+ // , String pluginLoadMode
175+ // , Map<String, Object> dirtyProperties
176+ // , Map<String, AbstractSideTableInfo> sideTableMap
177+ // , Map<String, Table> registerTableCache
178+ ExecuteProcessHelper .registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , dirtyMap , sideTableMap , registerTableCache );
151179 }
152180
153181 @ Test
154182 public void registerPluginUrlToCachedFile () throws Exception {
155- StreamExecutionEnvironment executionEnvironment = ExecuteProcessHelper .getStreamExeEnv (new Properties (), "local" );
183+ StreamExecutionEnvironment executionEnvironment = ExecuteProcessHelper .getStreamExeEnv (new Properties (), "local" );
156184 Set <URL > classPathSet = Sets .newHashSet ();
157185 classPathSet .add (new URL ("file://" ));
158186 ExecuteProcessHelper .registerPluginUrlToCachedFile (executionEnvironment , classPathSet );
@@ -164,5 +192,4 @@ public void getStreamExeEnv() throws Exception {
164192 }
165193
166194
167-
168195}
0 commit comments