2727import org .slf4j .LoggerFactory ;
2828
2929import java .io .Serializable ;
30- import java .util .Properties ;
30+ import java .util .Map ;
31+ import java .util .Objects ;
3132import java .util .concurrent .LinkedBlockingQueue ;
3233import java .util .concurrent .ThreadPoolExecutor ;
3334import java .util .concurrent .TimeUnit ;
3435import java .util .concurrent .atomic .AtomicLong ;
3536
37+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_BLOCKING_INTERVAL ;
38+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_ERROR_LIMIT_RATE ;
39+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_PRINT_LIMIT ;
40+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_TYPE ;
41+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DIRTY_BLOCK_STR ;
42+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DIRTY_LIMIT_RATE_STR ;
43+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PLUGIN_LOAD_MODE_STR ;
44+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PLUGIN_PATH_STR ;
45+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PLUGIN_TYPE_STR ;
46+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PRINT_LIMIT_STR ;
47+
3648/**
3749 * @author tiezhu
3850 * Company dtstack
3951 * Date 2020/8/27 星期四
4052 */
4153public class DirtyDataManager implements Serializable {
54+ private static final long serialVersionUID = 1L ;
4255
4356 public final static int MAX_POOL_SIZE_LIMIT = 5 ;
44- private static final long serialVersionUID = 7190970299538893497L ;
4557 private static final Logger LOG = LoggerFactory .getLogger (DirtyDataManager .class );
46- private static final String DIRTY_BLOCK_STR = "blockingInterval" ;
47- private static final String DIRTY_LIMIT_RATE_STR = "errorLimitRate" ;
4858 private final static int MAX_TASK_QUEUE_SIZE = 100 ;
49- private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8" ;
50- private final static String DEFAULT_BLOCKING_INTERVAL = "60" ;
51- public static AbstractDirtyDataConsumer consumer ;
5259
53- private static ThreadPoolExecutor dirtyDataConsumer ;
60+ private AbstractDirtyDataConsumer consumer ;
61+ private transient ThreadPoolExecutor dirtyDataConsumer ;
62+
63+ private static final DirtyDataManager INSTANCE = new DirtyDataManager ();
64+
5465 /**
5566 * 统计manager收集到的脏数据条数
5667 */
@@ -68,45 +79,65 @@ public class DirtyDataManager implements Serializable {
6879 */
6980 private double errorLimitRate ;
7081
82+ private DirtyDataManager () {
83+
84+ }
85+
7186 /**
7287 * 通过参数生成manager实例,并同时将consumer实例化
7388 */
74- public static DirtyDataManager newInstance (Properties properties ) {
89+ public static DirtyDataManager newInstance (Map < String , Object > properties ) {
7590 try {
76- DirtyDataManager manager = new DirtyDataManager ();
77- manager .blockingInterval = Long .parseLong (String .valueOf (properties .getOrDefault (DIRTY_BLOCK_STR , DEFAULT_BLOCKING_INTERVAL )));
78- manager .errorLimitRate = Double .parseDouble (String .valueOf (properties .getOrDefault (DIRTY_LIMIT_RATE_STR , DEFAULT_ERROR_LIMIT_RATE )));
79- consumer = DirtyConsumerFactory .getDirtyConsumer (
80- properties .getProperty ("type" )
81- , properties .getProperty ("pluginPath" )
82- , properties .getProperty ("pluginLoadMode" )
83- );
84- consumer .init (properties );
85- consumer .setQueue (new LinkedBlockingQueue <>());
86- dirtyDataConsumer = new ThreadPoolExecutor (MAX_POOL_SIZE_LIMIT , MAX_POOL_SIZE_LIMIT , 0 , TimeUnit .MILLISECONDS ,
87- new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ), new DTThreadFactory ("dirtyDataConsumer" ), new ThreadPoolExecutor .CallerRunsPolicy ());
88- dirtyDataConsumer .execute (consumer );
89- return manager ;
91+ INSTANCE .setBlockingInterval (Long .parseLong (
92+ String .valueOf (properties .getOrDefault (DIRTY_BLOCK_STR , DEFAULT_BLOCKING_INTERVAL ))));
93+ INSTANCE .setErrorLimitRate (Double .parseDouble (
94+ String .valueOf (properties .getOrDefault (DIRTY_LIMIT_RATE_STR , DEFAULT_ERROR_LIMIT_RATE ))));
95+
96+ INSTANCE .setConsumer (properties );
97+ return INSTANCE ;
9098 } catch (Exception e ) {
9199 throw new RuntimeException ("create dirtyManager error!" , e );
92100 }
93101 }
94102
103+ private void setConsumer (Map <String , Object > properties ) throws Exception {
104+ consumer = DirtyConsumerFactory .getDirtyConsumer (
105+ String .valueOf (properties .getOrDefault (PLUGIN_TYPE_STR , DEFAULT_TYPE )),
106+ String .valueOf (properties .get (PLUGIN_PATH_STR )),
107+ String .valueOf (properties .get (PLUGIN_LOAD_MODE_STR ))
108+ );
109+ consumer .init (properties );
110+ consumer .setQueue (new LinkedBlockingQueue <>());
111+ }
112+
113+ public void execute () {
114+ if (Objects .isNull (dirtyDataConsumer )) {
115+ dirtyDataConsumer = new ThreadPoolExecutor (
116+ MAX_POOL_SIZE_LIMIT ,
117+ MAX_POOL_SIZE_LIMIT ,
118+ 0 ,
119+ TimeUnit .MILLISECONDS ,
120+ new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ),
121+ new DTThreadFactory ("dirtyDataConsumer" , true ),
122+ new ThreadPoolExecutor .CallerRunsPolicy ());
123+ dirtyDataConsumer .execute (consumer );
124+ }
125+ }
126+
95127 /**
96128 * 设置脏数据插件默认配置
97129 *
98130 * @return console的默认配置
99131 */
100132 public static String buildDefaultDirty () {
101133 JSONObject jsonObject = new JSONObject ();
102- jsonObject .put ("type" , "console" );
103- jsonObject .put ("printLimit" , "1000" );
134+ jsonObject .put (PLUGIN_TYPE_STR , DEFAULT_TYPE );
135+ jsonObject .put (PRINT_LIMIT_STR , DEFAULT_PRINT_LIMIT );
104136 return jsonObject .toJSONString ();
105137 }
106138
107139 /**
108140 * 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
109- * TODO consumer 关闭时仍有数据没有消费到,假如有500条数据,在结束时实际消费数量可能只有493
110141 */
111142 public void close () {
112143 if (checkConsumer ()) {
@@ -122,13 +153,17 @@ public void close() {
122153 public void collectDirtyData (String dataInfo , String cause ) {
123154 DirtyDataEntity dirtyDataEntity = new DirtyDataEntity (dataInfo , System .currentTimeMillis (), cause );
124155 try {
125- consumer .collectDirtyData (dirtyDataEntity , blockingInterval );
126156 count .incrementAndGet ();
127- } catch (Exception ignored ) {
157+ consumer .collectDirtyData (dirtyDataEntity , blockingInterval );
158+ } catch (Exception e ) {
128159 LOG .warn ("dirty Data insert error ... Failed number: " + errorCount .incrementAndGet ());
129- LOG .warn ("error dirty data:" + dirtyDataEntity .toString ());
160+ LOG .warn ("error cause: " + e .getMessage ());
161+ LOG .warn ("error dirty data:" + dirtyDataEntity .getDirtyData ());
130162 if (errorCount .get () > Math .ceil (count .longValue () * errorLimitRate )) {
131- throw new RuntimeException (String .format ("The number of failed number 【%s】 reaches the limit, manager fails" , errorCount .get ()));
163+ // close consumer and manager
164+ close ();
165+ throw new RuntimeException (
166+ String .format ("The number of failed number 【%s】 reaches the limit, manager fails" , errorCount .get ()));
132167 }
133168 }
134169 }
@@ -139,4 +174,28 @@ public void collectDirtyData(String dataInfo, String cause) {
139174 public boolean checkConsumer () {
140175 return consumer .isRunning ();
141176 }
177+
178+ public AtomicLong getCount () {
179+ return count ;
180+ }
181+
182+ public AtomicLong getErrorCount () {
183+ return errorCount ;
184+ }
185+
186+ public long getBlockingInterval () {
187+ return blockingInterval ;
188+ }
189+
190+ public void setBlockingInterval (long blockingInterval ) {
191+ this .blockingInterval = blockingInterval ;
192+ }
193+
194+ public double getErrorLimitRate () {
195+ return errorLimitRate ;
196+ }
197+
198+ public void setErrorLimitRate (double errorLimitRate ) {
199+ this .errorLimitRate = errorLimitRate ;
200+ }
142201}
0 commit comments