Source code for etcd3.locks

import uuid

import tenacity

from etcd3 import exceptions

lock_prefix = '/locks/'


[docs]class Lock(object): """ A distributed lock. This can be used as a context manager, with the lock being acquired and released as you would expect: .. code-block:: python etcd = etcd3.client() # create a lock that expires after 20 seconds with etcd.lock('toot', ttl=20) as lock: # do something that requires the lock print(lock.is_acquired()) # refresh the timeout on the lease lock.refresh() :param name: name of the lock :type name: string or bytes :param ttl: length of time for the lock to live for in seconds. The lock will be released after this time elapses, unless refreshed :type ttl: int """ def __init__(self, name, ttl=60, etcd_client=None): self.name = name self.ttl = ttl if etcd_client is not None: self.etcd_client = etcd_client self.key = lock_prefix + self.name self.lease = None # store uuid as bytes, since it avoids having to decode each time we # need to compare self.uuid = uuid.uuid1().bytes
[docs] def acquire(self, timeout=10): """Acquire the lock. :params timeout: Maximum time to wait before returning. `None` means forever, any other value equal or greater than 0 is the number of seconds. :returns: True if the lock has been acquired, False otherwise. """ stop = ( tenacity.stop_never if timeout is None else tenacity.stop_after_delay(timeout) ) def wait(previous_attempt_number, delay_since_first_attempt): if timeout is None: remaining_timeout = None else: remaining_timeout = max(timeout - delay_since_first_attempt, 0) # TODO(jd): Wait for a DELETE event to happen: that'd mean the lock # has been released, rather than retrying on PUT events too try: self.etcd_client.watch_once(self.key, remaining_timeout) except exceptions.WatchTimedOut: pass return 0 @tenacity.retry(retry=tenacity.retry_never, stop=stop, wait=wait) def _acquire(): # TODO: save the created revision so we can check it later to make # sure we still have the lock self.lease = self.etcd_client.lease(self.ttl) success, _ = self.etcd_client.transaction( compare=[ self.etcd_client.transactions.create(self.key) == 0 ], success=[ self.etcd_client.transactions.put(self.key, self.uuid, lease=self.lease) ], failure=[ self.etcd_client.transactions.get(self.key) ] ) if success is True: return True self.lease = None raise tenacity.TryAgain try: return _acquire() except tenacity.RetryError: return False
[docs] def release(self): """Release the lock.""" success, _ = self.etcd_client.transaction( compare=[ self.etcd_client.transactions.value(self.key) == self.uuid ], success=[self.etcd_client.transactions.delete(self.key)], failure=[] ) return success
[docs] def refresh(self): """Refresh the time to live on this lock.""" if self.lease is not None: return self.lease.refresh() else: raise ValueError('No lease associated with this lock - have you ' 'acquired the lock yet?')
[docs] def is_acquired(self): """Check if this lock is currently acquired.""" uuid, _ = self.etcd_client.get(self.key) if uuid is None: return False return uuid == self.uuid
def __enter__(self): self.acquire() return self def __exit__(self, exception_type, exception_value, traceback): self.release()