2222
2323
2424import com .dtstack .flink .sql .constrant .ConfigConstrant ;
25+ import com .dtstack .flink .sql .enums .EStateBackend ;
26+ import org .apache .commons .lang3 .BooleanUtils ;
2527import org .apache .commons .lang3 .StringUtils ;
2628import org .apache .flink .api .common .time .Time ;
2729import org .apache .flink .api .common .typeinfo .TypeInformation ;
30+ import org .apache .flink .contrib .streaming .state .RocksDBStateBackend ;
31+ import org .apache .flink .runtime .state .StateBackend ;
2832import org .apache .flink .runtime .state .filesystem .FsStateBackend ;
33+ import org .apache .flink .runtime .state .memory .MemoryStateBackend ;
2934import org .apache .flink .streaming .api .CheckpointingMode ;
3035import org .apache .flink .streaming .api .TimeCharacteristic ;
3136import org .apache .flink .streaming .api .environment .CheckpointConfig ;
5156import java .util .regex .Matcher ;
5257import java .util .regex .Pattern ;
5358
59+
5460/**
5561 * Reason:
5662 * Date: 2017/2/21
@@ -130,10 +136,13 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
130136 throw new RuntimeException ("not support value of cleanup mode :" + cleanupModeStr );
131137 }
132138
133- String backendPath = properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_DATAURI_KEY );
134- if (backendPath != null ){
135- //set checkpoint save path on file system, 根据实际的需求设定文件路径,hdfs://, file://
136- env .setStateBackend (new FsStateBackend (backendPath ));
139+ String backendType = properties .getProperty (ConfigConstrant .STATE_BACKEND_KEY );
140+ String checkpointDataUri = properties .getProperty (ConfigConstrant .CHECKPOINTS_DIRECTORY_KEY );
141+ String backendIncremental = properties .getProperty (ConfigConstrant .STATE_BACKEND_INCREMENTAL_KEY , "true" );
142+
143+ if (!StringUtils .isEmpty (backendType )){
144+ StateBackend stateBackend = createStateBackend (backendType , checkpointDataUri , backendIncremental );
145+ env .setStateBackend (stateBackend );
137146 }
138147
139148 }
@@ -377,4 +386,28 @@ public static TypeInformation[] transformTypes(Class[] fieldTypes){
377386 return types ;
378387 }
379388
389+ private static StateBackend createStateBackend (String backendType , String checkpointDataUri , String backendIncremental ) throws IOException {
390+ EStateBackend stateBackendType = EStateBackend .convertFromString (backendType );
391+ StateBackend stateBackend = null ;
392+ switch (stateBackendType ) {
393+ case MEMORY :
394+ stateBackend = new MemoryStateBackend ();
395+ break ;
396+ case FILESYSTEM :
397+ checkpointDataUriEmptyCheck (checkpointDataUri , backendType );
398+ stateBackend = new FsStateBackend (checkpointDataUri );
399+ break ;
400+ case ROCKSDB :
401+ checkpointDataUriEmptyCheck (checkpointDataUri , backendType );
402+ stateBackend = new RocksDBStateBackend (checkpointDataUri , BooleanUtils .toBoolean (backendIncremental ));
403+ break ;
404+ }
405+ return stateBackend ;
406+ }
407+
408+ private static void checkpointDataUriEmptyCheck (String checkpointDataUri , String backendType ) {
409+ if (StringUtils .isEmpty (checkpointDataUri )) {
410+ throw new RuntimeException (backendType + " backend checkpointDataUri not null!" );
411+ }
412+ }
380413}
0 commit comments