Source code for etcd3.stateful.watch

import logging
import re
import socket
import threading
import time
from collections import deque

import six
from requests import ConnectionError
from requests.exceptions import ChunkedEncodingError

from ..errors import Etcd3WatchCanceled
from ..models import EventEventType
from ..utils import check_param
from ..utils import get_ident
from ..utils import log

EventType = EventEventType


[docs]class OnceTimeout(IOError): """ Timeout caused by watch once """ pass
[docs]class KeyValue(object): # pragma: no cover """ Model of the key-value of the event """
[docs] def __init__(self, data): self._data = data self.key = data.get('key') self.create_revision = data.get('create_revision') self.mod_revision = data.get('mod_revision') self.value = data.get('value') self.lease = data.get('lease')
[docs] def get(self, key, default=None): return self._data.get(key, default)
def __getitem__(self, item): return self._data.get(item) def __iter__(self): return iter(self._data) def __contains__(self, item): return item in self._data def __repr__(self): return "<KeyValue of '%s'>" % self.key
[docs]class Event(KeyValue): """ Watch event """
[docs] def __init__(self, data, header=None): """ :param data: dict data of a etcdserverpbWatchResponse.events[<mvccpbEvent>] :param header: the header of etcdserverpbWatchResponse """ super(Event, self).__init__(data.kv._data) self.header = header self.type = data.type or EventType.PUT # default is PUT self._data['type'] = self.type self.prev_kv = None if 'prev_kv' in data: self.prev_kv = KeyValue(data.prev_kv._data) self._data['prev_kv'] = self.prev_kv
def __repr__(self): return "<WatchEvent %s '%s'>" % (self.type.value, self.key)
[docs]class Watcher(object):
[docs] @check_param(at_least_one_of=['key', 'all'], at_most_one_of=['range_end', 'prefix', 'all']) def __init__(self, client, max_retries=-1, key=None, range_end=None, start_revision=None, progress_notify=None, prev_kv=None, prefix=None, all=None, no_put=False, no_delete=False): """ Initialize a watcher :type client: BaseClient :param client: client instance of etcd3 :type max_retries: int :param max_retries: max retries when watch failed due to network problem, -1 means no limit [default: -1] :type key: str or bytes :param key: key is the key to register for watching. :type range_end: str or bytes :param range_end: range_end is the end of the range [key, range_end) to watch. If range_end is not given, only the key argument is watched. If range_end is equal to '\0', all keys greater than or equal to the key argument are watched. If the range_end is one bit larger than the given key, then all keys with the prefix (the given key) will be watched. :type start_revision: int :param start_revision: start_revision is an optional revision to watch from (inclusive). No start_revision is "now". :type progress_notify: bool :param progress_notify: progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events. It is useful when clients wish to recover a disconnected watcher starting from a recent known revision. The etcd server may decide how often it will send notifications based on current load. :type prev_kv: bool :param prev_kv: If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. :type prefix: bool :param prefix: if the key is a prefix [default: False] :type all: bool :param all: all the keys [default: False] :type no_put: bool :param no_put: filter out the put events at server side before it sends back to the watcher. [default: False] :type no_delete: bool :param no_delete: filter out the delete events at server side before it sends back to the watcher. [default: False] """ self.client = client self.revision = None self.watch_id = None self.retries = 0 self.errors = deque(maxlen=20) if max_retries == -1: max_retries = 9223372036854775807 # maxint self.max_retries = max_retries self.callbacks = [] self.callbacks_lock = threading.Lock() self.watching = False self.timeout = None # only meaningful for watch_once self._thread = None self._resp = None self._once = False self.key = key self.range_end = range_end self.start_revision = start_revision self.progress_notify = progress_notify self.prev_kv = prev_kv self.prefix = prefix self.all = all self.no_put = no_put self.no_delete = no_delete
[docs] def set_default_timeout(self, timeout): """ Set the default timeout of watch request :type timeout: int :param timeout: timeout in seconds """ self.timeout = timeout
[docs] def clear_revision(self): """ Clear the start_revision that stored in watcher """ self.start_revision = None self.revision = None
[docs] def clear_callbacks(self): """ Remove all callbacks """ with self.callbacks_lock: self.callbacks = []
[docs] def request_create(self): """ Start a watch request """ if self.revision is not None: # continue last watch self.start_revision = self.revision + 1 return self.client.watch_create( key=self.key, range_end=self.range_end, start_revision=self.start_revision, progress_notify=self.progress_notify, prev_kv=self.prev_kv, prefix=self.prefix, all=self.all, no_put=self.no_put, no_delete=self.no_delete, timeout=self.timeout )
[docs] def request_cancel(self): # pragma: no cover """ Cancel the watcher [Not Implemented because of etcd3 returns no watch_id] """ # once really implemented, the error handling of Etcd3WatchCanceled when manually cancel should be considered if self.watch_id: # return self.client.watch_cancel(watch_id=self.watch_id) pass
[docs] @staticmethod def get_filter(filter): """ Get the event filter function :type filter: callable or regex string or EventType or None :param filter: will generate a filter function from this param :return: callable """ if callable(filter): filter_func = filter elif isinstance(filter, (six.string_types, bytes)): regex = re.compile(filter) def py2_filter_func(e): key = e.key return regex.match(key) def py3_filter_func(e): try: key = six.text_type(e.key, encoding='utf-8') except Exception: return return regex.match(key) filter_func = py3_filter_func if six.PY3 else py2_filter_func elif filter is None: filter_func = lambda e: True elif isinstance(filter, EventType): filter_func = lambda e: e.type == filter else: raise TypeError('expect filter to be one of string, EventType, callable got %s' % type(filter)) return filter_func
[docs] def onEvent(self, filter_or_cb, cb=None): """ Add a callback to a event that matches the filter If only one param is given, which is filter_or_cb, it will be treated as the callback. If any event comes, it will be called. :type filter_or_cb: callable or regex string or EventType :param filter_or_cb: filter or callback function :param cb: the callback function """ if cb: filter = filter_or_cb else: filter = None cb = filter_or_cb if not callable(cb): raise TypeError('callback should be a callable') filter_func = self.get_filter(filter) with self.callbacks_lock: self.callbacks.append((filter_func, filter, cb))
[docs] @check_param(at_least_one_of=['filter', 'cb']) def unEvent(self, filter=None, cb=None): # noqa # ignore redefinition of filter """ remove a callback or filter event that's been previously added via onEvent() If both parameters are given they are ANDd together; to OR the, make two calls. :type filter: callable or regex string or EventType :param filter: the callable filter or regex string or EventType the event to be removed was registerd with :param cb: the callback funtion the event to be removed was registerd with """ with self.callbacks_lock: for i in reversed(range(len(self.callbacks))): efilter, eraw_filter, ecb = self.callbacks[i] if cb is not None and ecb != cb: continue if filter is not None and filter not in (efilter, eraw_filter): continue del self.callbacks[i]
[docs] def dispatch_event(self, event): """ Find the callbacks, if callback's filter fits this event, call the callback :param event: Event """ log.debug("dispatching event '%s'" % event) with self.callbacks_lock: callbacks = list( cb for filtr, _, cb in self.callbacks if filtr(event) ) for cb in callbacks: cb(event)
def _ensure_callbacks(self): if not self.callbacks: raise TypeError("haven't watch on any event yet, use onEvent to watch a event") def _ensure_not_watching(self): if self.watching is True: raise RuntimeError("already watching") if self._thread and self._thread.is_alive() and self._thread.ident != get_ident(): raise RuntimeError("watch thread seems running") def _kill_response_stream(self): if not self._resp or (self._resp and self._resp.raw.closed): return try: log.debug("closing response stream") self.request_cancel() s = socket.fromfd(self._resp.raw._fp.fileno(), socket.AF_INET, socket.SOCK_STREAM) s.shutdown(socket.SHUT_RDWR) s.close() self._resp.raw.close() self._resp.close() self._resp.connection.close() except Exception: pass
[docs] def run(self): """ Run the watcher and handel events by callbacks """ self._ensure_callbacks() self._ensure_not_watching() self.errors.clear() try: with self: for event in self: self.dispatch_event(event) finally: self._kill_response_stream() self.watching = False
[docs] def stop(self): """ Stop watching, close the watch stream and exit the daemon thread """ log.debug("stop watching") self.watching = False self._kill_response_stream() if self._thread and self._thread.is_alive() and self._thread.ident != get_ident(): self._thread.join()
cancel = stop
[docs] def runDaemon(self): """ Run Watcher in a daemon thread """ self._ensure_callbacks() self._ensure_not_watching() t = self._thread = threading.Thread(target=self.run) t.setDaemon(True) t.start()
[docs] def watch_once(self, filter=None, timeout=None): """ watch the filtered event, once have event, return it if timed out, return None """ filter = self.get_filter(filter) old_timeout = self.timeout self.timeout = timeout try: self._once = True with self: for event in self: if filter(event): return event except OnceTimeout: return finally: self.stop() self._once = False self.timeout = old_timeout
[docs] def __enter__(self): self._ensure_not_watching() self._resp = self.request_create() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): self.stop()
def __del__(self): self.stop() def __iter__(self): self.errors.clear() retries = 0 while True: try: self.watching = True if log.level <= logging.DEBUG: if self.all: log.debug("start watching all keys") elif self.prefix: log.debug("start watching prefix '%s'" % self.key) elif self.range_end: log.debug("start watching from '%s' to '%s'" % (self.key, self.range_end)) if not self._resp or self._resp.raw.closed: self._resp = self.request_create() with self._resp as w: event_stream = iter(w) while self.watching: if self._resp.raw._fp.fp is None: raise ConnectionError("response connection closed") r = next(event_stream) log.debug("got a watch response") self.revision = r.header.revision if 'created' in r: log.debug("watch request created") self.start_revision = r.header.revision self.watch_id = r.watch_id if ('canceled' in r and r.canceled) or ('compact_revision' in r and r.compact_revision): # etcd version < 3.3 returns compact_revision without canceled compacted = False if 'compact_revision' in r and r.compact_revision > 0: compacted = True err = Etcd3WatchCanceled("watch on compacted revision: %d" % self.start_revision, r) else: err = Etcd3WatchCanceled(r.cancel_reason, r) if retries == 0 or retries >= self.max_retries: # first request raise error to caller raise err else: self.errors.append(err) log.debug("failed watching (times:%d) retrying %s" % (retries, err)) if compacted: self.revision = r.compact_revision - 1 # next request start from compact_revision self._kill_response_stream() # close connection and throw Connection error if 'events' in r: for event in r.events: yield Event(event, r.header) except (ConnectionError, ChunkedEncodingError) as e: # ConnectionError(MaxRetryError) means cannot reach the server if 'Max retries exceeded with url' in str(e): raise # no need to retry elif 'Read timed out.' in str(e) and self._once: # if timed out and doing watch_once raise OnceTimeout # ChunkedEncodingError usually means we lost the connection, could cause by watcher.stop() elif not self.watching: # raise StopIteration # watch stopped by user return if retries < self.max_retries: # connection unexpectedly or just reached the timeout self.errors.append(e) log.debug("failed watching (times:%d) retrying %s" % (retries, e)) retries += 1 else: # self.watching = False # no need the stop() always called in a with context self.stop() raise time.sleep(0.2)
comments powered by Disqus