import os
import socket
import tempfile
import uuid
import six
from .watch import EventType
from ..errors import ErrLeaseNotFound
from ..utils import get_ident
from ..utils import log
[docs]class EtcdLockError(Exception):
pass
[docs]class EtcdLockAcquireTimeout(Exception):
pass
[docs]class Lock(object): # TODO: maybe we could improve the performance by reduce some HTTP requests
"""
Locking recipe for etcd, inspired by the kazoo recipe for zookeeper
"""
DEFAULT_LOCK_TTL = 60
HOST = 'host'
PROCESS = 'process'
THREAD = 'thread'
[docs] def __init__(self, client, lock_name, lock_ttl=DEFAULT_LOCK_TTL, reentrant=None, lock_prefix='_locks'):
"""
:type client: BaseClient
:param client: instance of etcd.Client
:type lock_name: str
:param lock_name: the name of the lock
:type lock_ttl: int
:param lock_ttl: ttl of the lock, default is 60s
:type reentrant: str
:param reentrant: the reentrant type of the lock can set to Lock.HOST, Lock.PROCESS, Lock.THREAD
:type lock_prefix: str
:param lock_prefix: the prefix of the lock key
"""
self.client = client
self.name = lock_name
self.lock_ttl = lock_ttl
self.lock_prefix = lock_prefix
self.reentrant = reentrant
self.uuid = self._get_uuid()
if six.PY3 and isinstance(self.uuid, str):
self.uuid = six.binary_type(self.uuid, encoding='utf-8')
self.lock_key = "{}/{}".format(lock_prefix, lock_name) # the key of the lock
self.holders_key = self.lock_key + '/holders' # the key of holders-count
self.is_taken = False # if the lock is taken by someone
self.lease = None
self.__holders_lease = None
self._watcher = None
log.debug("Initiating lock for %s with uuid %s", self.lock_key, self.uuid)
def _get_uuid(self):
hostname = socket.gethostname()
if self.reentrant is None:
return '%s:%s' % (hostname, uuid.uuid4().hex[:8])
elif self.reentrant == self.PROCESS:
return '%s:proc:%s' % (hostname, os.getpid())
elif self.reentrant == self.THREAD:
return '%s:thrd:%s' % (hostname, get_ident())
elif self.reentrant == self.HOST:
return self._get_global_uuid('%s:host:%s' % (hostname, socket.gethostbyname(hostname)))
else:
raise TypeError("unknown reentrant type, expect one of Lock.HOST, Lock.PROCESS, Lock.THREAD")
def _get_global_uuid(self, uuid):
path = tempfile.gettempdir() + '/' + self.name + '_lock'
while os.path.isdir(path):
path = path + '_'
if not os.path.exists(path):
log.debug("writing host-global uuid to %s" % path)
with open(path, 'w') as f:
f.write(uuid)
return uuid
else:
log.debug("reading host-global uuid from %s" % path)
with open(path, 'rb') as f:
return f.read()
def _get_locker(self):
r = self.client.range(self.lock_key).kvs
return r[0] if r else None
def _holders_lease(self):
if self.__holders_lease:
return self.__holders_lease
locker = self._get_locker()
if locker:
self.__holders_lease = locker.lease
return self.__holders_lease
[docs] def holders(self):
"""
tell how many holders are holding the lock
:return: int
"""
if not self.reentrant:
if self._get_locker():
return 1
return 0
r = self.client.range(self.holders_key).kvs
if r is not None:
self.__holders_lease = r[0].lease
return int(r[0].value)
lease = self._holders_lease()
if lease:
try:
log.debug("try creating holders count key with lease %d" % lease)
self.client.put(self.holders_key, b'%d' % 0, lease=lease)
except ErrLeaseNotFound:
self.__holders_lease = None
return 0
[docs] def incr_holder(self):
"""
Atomic increase the holder count by 1
"""
n = self.holders()
t = self.client.Txn()
t.If(t.key(self.holders_key).value == b'%d' % n)
t.Then(t.put(self.holders_key, b'%d' % (n + 1), lease=self._holders_lease()))
r = t.commit()
if r.succeeded:
return n + 1
log.debug("failed to incr holders count")
[docs] def decr_holder(self):
"""
Atomic decrease the holder count by 1
"""
n = self.holders() or 0
t = self.client.Txn()
t.If(t.key(self.holders_key).value == b'%d' % n)
if n - 1 == 0:
t.Then(t.delete(self.holders_key))
else:
t.Then(t.put(self.holders_key, b'%d' % (n - 1), lease=self._holders_lease()))
r = t.commit()
if r.succeeded:
return n - 1
log.debug("failed to decr holders count")
@property
def is_acquired(self):
"""
if the lock is acquired
"""
if not self.is_taken:
log.debug("Lock not taken")
return False
locker = self._get_locker()
if locker:
self.is_taken = True
if locker and locker.value == self.uuid:
if self.lease and self.lease.keeping:
return True
return False
acquired = is_acquired
[docs] def acquire(self, block=True, lock_ttl=None, timeout=None, delete_key=True):
"""
Acquire the lock.
:type block: bool
:param block: Block until the lock is obtained, or timeout is reached [default: True]
:type lock_ttl: int
:param lock_ttl: The duration of the lock we acquired, set to None for eternal locks
:type timeout: int
:param timeout: The time to wait before giving up on getting a lock
:type delete_key: bool
:param delete_key: whether delete the key if it has not attached to any lease [default: True]
"""
lock_ttl = self.lock_ttl = lock_ttl or self.lock_ttl
locker = self._get_locker()
if locker:
if not locker.lease:
if not delete_key:
raise EtcdLockError("lock-key %s already exist but with no lease attached")
log.debug("delete lock key that has no expiration")
self.client.delete_range(locker.key)
elif locker.value == self.uuid:
log.debug("we already have the lock")
if not (self.lease and self.lease.keeping):
log.debug("we have the lock but not keeping it, now keep the lease alive")
self.__holders_lease = None
self.lease = self.client.Lease(self.lock_ttl, locker.lease, new=False)
self.lease.grant()
self.lease.keepalive()
if self.reentrant:
log.debug("the lock is reentrant, will incr its holders count")
self.incr_holder()
self.is_taken = True
return self
elif block:
event = self.wait(locker=locker, timeout=timeout)
if not event:
log.debug("lock acquire wait timeout")
raise EtcdLockAcquireTimeout
else:
self.is_taken = True
return
# locker key not found
log.debug("writing lock key to %s", self.lock_key)
if self.lease and self.lease.keeping: # clean old lease that may exist
self.lease.revoke()
self.__holders_lease = None
self.lease = self.client.Lease(self.lock_ttl)
self.lease.grant()
txn = self.client.Txn()
txn.If(txn.key(self.lock_key).value == self.uuid)
txn.Else(txn.put(self.lock_key, self.uuid, lease=self.lease.ID))
r = txn.commit()
if r.succeeded:
return self.acquire(block, lock_ttl, timeout, delete_key)
else:
self.is_taken = True
self.lease.keepalive()
if self.reentrant:
log.debug("the lock is reentrant, will incr its holders count")
self.incr_holder()
log.debug("Lock key written, we got the lock")
log.debug("Lock acquired (lock_key: %s, value: %s)" % (self.lock_key, self.uuid))
return self
[docs] def wait(self, locker=None, timeout=None):
"""
Wait until the lock is lock is able to acquire
:param locker: kv of the lock
:param timeout: wait timeout
"""
locker = locker or self._get_locker()
if not locker:
return
self._watcher = watcher = self.client.Watcher(key=locker.key, max_retries=0)
return watcher.watch_once(lambda e: e.type == EventType.DELETE or e.value == self.uuid, timeout=timeout)
[docs] def release(self):
"""
Release the lock
"""
if self.reentrant:
n = self.decr_holder()
if n is not None and n == 0:
self.lease.revoke()
self.lease = None
self.is_taken = False
else:
self.lease.cancel_keepalive(join=False)
self.lease = None
self.is_taken = True
else:
self.lease.revoke()
self.lease = None
self.is_taken = False
log.debug("Lock released (lock_key: %s, value: %s)" % (self.lock_key, self.uuid))
[docs] def __enter__(self):
"""
You can use the lock as a contextmanager
"""
self.acquire()
return self
[docs] def __exit__(self, type, value, traceback):
self.release()
return False