From 9c01a933424c493af5d514e8aca2b4a2046a4cf7 Mon Sep 17 00:00:00 2001 From: Grant Jenks Date: Fri, 7 Jun 2019 11:22:51 -0700 Subject: [PATCH 1/2] Add fair lock prototype --- diskcache/recipes.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/diskcache/recipes.py b/diskcache/recipes.py index e5da48c..f259299 100644 --- a/diskcache/recipes.py +++ b/diskcache/recipes.py @@ -178,6 +178,44 @@ def __exit__(self, *exc_info): self.release() +class FairLock(object): + """Recipe for cross-process and cross-thread fair lock. + + # TODO: I don't think this is correct yet. How to handle expiry? + + Based on the "Ticket Lock" algorithm: + https://en.wikipedia.org/wiki/Ticket_lock + + """ + def __init__(self): + self._cache.add('tickets', 0) + self._cache.add('serving', 1) + + def acquire(self): + while True: + ticket = self._cache.incr( + self._tickets, expire=self._expire, tag=self._tag, retry=True, + ) + while True: + serving = self._cache.get(self._serving) + if serving is None: + # Expiration occurred! + pass # TODO + elif serving < ticket: + time.sleep(0.001) + elif serving == ticket: + return + else: + assert serving > ticket + # We got skipped! + pass # TODO + + def release(self): + self._cache.incr( + self._serving, expire=self._expire, tag=self._tag, retry=True, + ) + + class BoundedSemaphore(object): """Recipe for cross-process and cross-thread bounded semaphore. From 2056c60464e6f50676343e53622c4068c7da93e0 Mon Sep 17 00:00:00 2001 From: Grant Jenks Date: Fri, 7 Jun 2019 12:44:08 -0700 Subject: [PATCH 2/2] More brainstorming for a Fair Lock implementation --- diskcache/recipes.py | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/diskcache/recipes.py b/diskcache/recipes.py index f259299..c7977f7 100644 --- a/diskcache/recipes.py +++ b/diskcache/recipes.py @@ -181,38 +181,58 @@ def __exit__(self, *exc_info): class FairLock(object): """Recipe for cross-process and cross-thread fair lock. - # TODO: I don't think this is correct yet. How to handle expiry? - Based on the "Ticket Lock" algorithm: https://en.wikipedia.org/wiki/Ticket_lock """ - def __init__(self): - self._cache.add('tickets', 0) - self._cache.add('serving', 1) + def __init__(self, cache, prefix, expire=None, tag=None): + self._cache = cache + self._tickets_key = prefix + 'tickets' + self._serving_key = prefix + 'serving' + self._expire = expire + self._tag = tag + + def setup(self): + self._cache.add( + self._tickets_key, 0, + expire=self._expire, tag=self._tag, retry=True, + ) + self._cache.add( + self._serving_key, 1, + expire=self._expire, tag=self._tag, retry=True, + ) def acquire(self): while True: ticket = self._cache.incr( - self._tickets, expire=self._expire, tag=self._tag, retry=True, + self._tickets_key, + expire=self._expire, tag=self._tag, retry=True, ) while True: - serving = self._cache.get(self._serving) + serving = self._cache.get(self._serving_key, retry=True) if serving is None: - # Expiration occurred! - pass # TODO + self._cache.add( + self._serving_key, ticket, + expire=self._expire, tag=self._tag, retry=True, + ) + # TODO: How do we know the tickets key has not expired? + # If the tickets key has expired, is it possible that + # another acquire() method has the same ticket? + # Maybe a transaction is necessary to set both keys? + # If both keys are set then what if another thread has + # acquired the lock already? elif serving < ticket: time.sleep(0.001) elif serving == ticket: return else: assert serving > ticket - # We got skipped! - pass # TODO + # We got skipped! Take a new ticket. + break def release(self): self._cache.incr( - self._serving, expire=self._expire, tag=self._tag, retry=True, + self._serving_key, expire=self._expire, tag=self._tag, retry=True, )