@@ -262,21 +262,44 @@ private boolean tryAcquireOrRenew() {
262262 return false ;
263263 }
264264
265+ if (log .isDebugEnabled ()) {
266+ log .debug ("Lock not found, try to create it" );
267+ }
268+
269+ // No Lock resource exists, try to get leadership by creating it
265270 return createLock (lock , leaderElectionRecord );
266271 }
267272
273+ // alright, we have an existing lock resource
274+ // 1. Is Lock Empty? --> try to get leadership by updating it
275+ // 2. Am I the Leader? --> update info and renew lease by updating it
276+ // 3. I am not the Leader?
277+ // 3.1 is Lock expired? --> try to get leadership by updating it
278+ // 3.2 Lock not expired? --> update info, try later
279+
268280 if (oldLeaderElectionRecord == null
269281 || oldLeaderElectionRecord .getAcquireTime () == null
270282 || oldLeaderElectionRecord .getRenewTime () == null
271283 || oldLeaderElectionRecord .getHolderIdentity () == null ) {
272- return createLock (lock , leaderElectionRecord );
284+ // We found the lock resource with an empty LeaderElectionRecord, try to get leadership by updating it
285+ if (log .isDebugEnabled ()) {
286+ log .debug ("Update lock to get lease" );
287+ }
288+
289+ if (oldLeaderElectionRecord != null ) {
290+ // maintain the leaderTransitions
291+ leaderElectionRecord .setLeaderTransitions (oldLeaderElectionRecord .getLeaderTransitions () + 1 );
292+ }
293+
294+ return updateLock (lock , leaderElectionRecord );
273295 }
274296
275- // 2. Record obtained, check the Identity & Time
297+ // 2. Record obtained with LeaderElectionRecord , check the Identity & Time
276298 if (!oldLeaderElectionRecord .equals (this .observedRecord )) {
277299 this .observedRecord = oldLeaderElectionRecord ;
278300 this .observedTimeMilliSeconds = System .currentTimeMillis ();
279301 }
302+
280303 if (observedTimeMilliSeconds + config .getLeaseDuration ().toMillis () > now .getTime ()
281304 && !isLeader ()) {
282305 if (log .isDebugEnabled ()) {
@@ -296,26 +319,20 @@ private boolean tryAcquireOrRenew() {
296319 leaderElectionRecord .setLeaderTransitions (oldLeaderElectionRecord .getLeaderTransitions () + 1 );
297320 }
298321
299- // update the lock itself
300322 if (log .isDebugEnabled ()) {
301- log .debug ("Update lock acquire time to keep lease" );
323+ log .debug ("Update lock to renew lease" );
302324 }
303- boolean updateSuccess = config .getLock ().update (leaderElectionRecord );
304- if (!updateSuccess ) {
305- return false ;
306- }
307- this .observedRecord = leaderElectionRecord ;
308- this .observedTimeMilliSeconds = System .currentTimeMillis ();
309- if (log .isDebugEnabled ()) {
325+
326+ boolean renewalStatus = updateLock (lock , leaderElectionRecord );
327+
328+ if (renewalStatus && log .isDebugEnabled ()) {
310329 log .debug ("TryAcquireOrRenew return success" );
311330 }
312- return true ;
331+
332+ return renewalStatus ;
313333 }
314334
315335 private boolean createLock (Lock lock , LeaderElectionRecord leaderElectionRecord ) {
316- if (log .isDebugEnabled ()) {
317- log .debug ("Lock not found, try to create it" );
318- }
319336 boolean createSuccess = lock .create (leaderElectionRecord );
320337 if (!createSuccess ) {
321338 return false ;
@@ -325,6 +342,16 @@ private boolean createLock(Lock lock, LeaderElectionRecord leaderElectionRecord)
325342 return true ;
326343 }
327344
345+ private boolean updateLock (Lock lock , LeaderElectionRecord leaderElectionRecord ) {
346+ boolean updateSuccess = lock .update (leaderElectionRecord );
347+ if (!updateSuccess ) {
348+ return false ;
349+ }
350+ this .observedRecord = leaderElectionRecord ;
351+ this .observedTimeMilliSeconds = System .currentTimeMillis ();
352+ return true ;
353+ }
354+
328355 private boolean isLeader () {
329356 return this .config .getLock ().identity ().equals (this .observedRecord .getHolderIdentity ());
330357 }
@@ -345,8 +372,49 @@ private void maybeReportTransition() {
345372
346373 @ Override
347374 public void close () {
375+ log .info ("Closing..." );
348376 scheduledWorkers .shutdownNow ();
349377 leaseWorkers .shutdownNow ();
350378 hookWorkers .shutdownNow ();
379+
380+ // If I am the leader, free the lock so that other candidates can take it immediately
381+ if (observedRecord != null && isLeader ()) {
382+
383+ // First ensure that all executors have stopped
384+ try {
385+ boolean isTerminated = scheduledWorkers .awaitTermination (config .getRetryPeriod ().getSeconds (), TimeUnit .SECONDS );
386+ if (!isTerminated ) {
387+ log .warn ("scheduledWorkers executor termination didn't finish." );
388+ return ;
389+ }
390+
391+ isTerminated = leaseWorkers .awaitTermination (config .getRetryPeriod ().getSeconds (), TimeUnit .SECONDS );
392+ if (!isTerminated ) {
393+ log .warn ("leaseWorkers executor termination didn't finish." );
394+ return ;
395+ }
396+
397+ isTerminated = hookWorkers .awaitTermination (config .getRetryPeriod ().getSeconds (), TimeUnit .SECONDS );
398+ if (!isTerminated ) {
399+ log .warn ("hookWorkers executor termination didn't finish." );
400+ return ;
401+ }
402+ } catch (InterruptedException ex ) {
403+ log .warn ("Failed to ensure executors termination." , ex );
404+ return ;
405+ }
406+
407+ log .info ("Giving up the lock...." );
408+ LeaderElectionRecord emptyRecord = new LeaderElectionRecord ();
409+ // maintain leaderTransitions count
410+ emptyRecord .setLeaderTransitions (observedRecord .getLeaderTransitions ());
411+ boolean status = this .config .getLock ().update (emptyRecord );
412+
413+ if (!status ) {
414+ log .warn ("Failed to give up the lock." );
415+ }
416+ }
417+
418+ log .info ("Closed" );
351419 }
352420}
0 commit comments