Source code for etcd3.client

synchronous client

import json

import requests
import six

from .baseclient import BaseClient
from .baseclient import BaseModelizedStreamResponse
from .baseclient import DEFAULT_VERSION
from .errors import Etcd3Exception
from .errors import Etcd3StreamError
from .errors import get_client_error
from .utils import iter_json_string

[docs]class ModelizedStreamResponse(BaseModelizedStreamResponse): """ Model of a stream response """
[docs] def __init__(self, client, method, resp, decode=True): """ :param resp: Response """ self.client = client self.method = method self.resp = resp self.decode = decode
@property def raw(self): return self.resp.raw
[docs] def close(self): """ close the stream """ return self.resp.close()
[docs] def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): self.close()
# # def __del__(self): # self.close() def __iter__(self): for data in iter_response(self.resp): if not data: continue if six.PY3: data = six.text_type(data, encoding='utf-8') data = json.loads(data) if data.get('error'): # pragma: no cover # {"error":{"grpc_code":14,"http_code":503,"message":"rpc error: code = Unavailable desc = transport is closing","http_status":"Service Unavailable"}} err = data.get('error') raise get_client_error(err.get('message'), code=err.get('code'), status=err.get('http_code')) r = self.client._modelizeResponseData(self.method, data, decode=self.decode) if r.result: r = r.result yield r
[docs]def iter_response(resp): """ yield response content by every json object we don't yield by line, because the content of etcd's gRPC-JSON-Gateway stream response does not have a delimiter between each object by default. (only one line) :param resp: Response :return: dict """ left_chunk = b'' # # 2**15 < ceil(32*1024/3*4) < 2**16 = 65536 for chunk in resp.iter_content(chunk_size=65536): chunk = chunk.strip() chunk = left_chunk + chunk for ok, s, _ in iter_json_string(chunk, resp=resp, err_cls=Etcd3StreamError): if ok: yield s else: left_chunk = s if left_chunk: # pragma: no cover raise Etcd3StreamError("Stream decode error", left_chunk, resp)
[docs]class Client(BaseClient):
[docs] def __init__(self, host='', port=2379, protocol='http', cert=(), verify=None, timeout=None, headers=None, user_agent=None, pool_size=30, username=None, password=None, token=None, max_retries=0, server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION): """ :param max_retries: The maximum number of retries each connection should attempt. Note, this applies only to failed DNS lookups, socket connections and connection timeouts, never to requests where data has made it to the server. By default, Requests does not retry failed connections. If you need granular control over the conditions under which we retry a request, import urllib3's ``Retry`` class and pass that instead. """ super(Client, self).__init__(host=host, port=port, protocol=protocol, cert=cert, verify=verify, timeout=timeout, headers=headers, user_agent=user_agent, pool_size=pool_size, username=username, password=password, token=token, server_version=server_version, cluster_version=cluster_version) self._session = requests.session() self._session.cert = self.cert self._session.verify = self.verify self.__set_conn_pool(pool_size, max_retries)
def __set_conn_pool(self, pool_size, max_retries): # aiohttp does not support a max_retries param like requests # For now we just mark it as a TODO feature from requests.adapters import HTTPAdapter adapter = HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size, max_retries=max_retries) self._session.mount('http://', adapter) self._session.mount('https://', adapter)
[docs] def close(self): """ close all connections in connection pool """ return self._session.close()
def _modelizeStreamResponse(self, method, resp, decode=True): return ModelizedStreamResponse(self, method, resp, decode) @staticmethod def _raise_for_status(resp): status = resp.status_code if status < 400: return try: data = resp.json() except Exception: error = resp.content code = 2 else: error = data.get('error') code = data.get('code') raise get_client_error(error, code, status, resp) def _get(self, url, **kwargs): r""" Sends a GET request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ return self._session.get(url, **kwargs) def _post(self, url, data=None, json=None, **kwargs): r""" Sends a POST request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ return, data=data, json=json, **kwargs)
[docs] def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kwargs): # TODO: add modelize param """ call ETCDv3 RPC and return response object :type method: str :param method: the rpc method, which is a path of RESTful API :type data: dict :param data: request payload to be post to ETCD's gRPC-JSON-Gateway default: {} :type stream: bool :param stream: whether return a stream response object, default: False :type encode: bool :param encode: whether encode the data before post, default: True :param kwargs: additional params to pass to the http request, like headers, timeout etc. :return: Etcd3RPCResponseModel or Etcd3StreamingResponse """ data = data or {} kwargs.setdefault('timeout', self.timeout) if self.token: kwargs.setdefault('headers', {}).setdefault('authorization', self.token) kwargs.setdefault('headers', {}).setdefault('user_agent', self.user_agent) for k, v in self.headers.items(): kwargs.setdefault('headers', {}).setdefault(k, v) if isinstance(data, dict): if encode: data = self._encodeRPCRequest(method, data) resp = self._post(self._url(method), json=data or {}, stream=stream, **kwargs) else: resp = self._post(self._url(method), data=data, stream=stream, **kwargs) self._raise_for_status(resp) if raw: return resp if stream: try: return self._modelizeStreamResponse(method, resp) except Etcd3Exception: resp.close() return self._modelizeResponseData(method, resp.json())
[docs] def auth(self, username=None, password=None): """ call auth.authenticate and save the token :type username: str :param username: username :type password: str :param password: password """ username = username or self.username password = password or self.password old = self.token self.token = None try: r = self.authenticate(username, password) except Exception: self.token = old raise else: self.username = username self.password = password self.token = r.token
comments powered by Disqus