Source code for etcd3.client

import functools
import inspect
import threading

import grpc
import grpc._channel

from six.moves import queue

import etcd3.etcdrpc as etcdrpc
import etcd3.exceptions as exceptions
import etcd3.leases as leases
import etcd3.locks as locks
import etcd3.members
import etcd3.transactions as transactions
import etcd3.utils as utils
import etcd3.watch as watch

_EXCEPTIONS_BY_CODE = {
    grpc.StatusCode.INTERNAL: exceptions.InternalServerError,
    grpc.StatusCode.UNAVAILABLE: exceptions.ConnectionFailedError,
    grpc.StatusCode.DEADLINE_EXCEEDED: exceptions.ConnectionTimeoutError,
    grpc.StatusCode.FAILED_PRECONDITION: exceptions.PreconditionFailedError,
}


def _translate_exception(exc):
    code = exc.code()
    exception = _EXCEPTIONS_BY_CODE.get(code)
    if exception is None:
        raise
    raise exception


def _handle_errors(f):
    if inspect.isgeneratorfunction(f):
        def handler(*args, **kwargs):
            try:
                for data in f(*args, **kwargs):
                    yield data
            except grpc.RpcError as exc:
                _translate_exception(exc)
    else:
        def handler(*args, **kwargs):
            try:
                return f(*args, **kwargs)
            except grpc.RpcError as exc:
                _translate_exception(exc)

    return functools.wraps(f)(handler)


class Transactions(object):
    def __init__(self):
        self.value = transactions.Value
        self.version = transactions.Version
        self.create = transactions.Create
        self.mod = transactions.Mod

        self.put = transactions.Put
        self.get = transactions.Get
        self.delete = transactions.Delete
        self.txn = transactions.Txn


class KVMetadata(object):
    def __init__(self, keyvalue, header):
        self.key = keyvalue.key
        self.create_revision = keyvalue.create_revision
        self.mod_revision = keyvalue.mod_revision
        self.version = keyvalue.version
        self.lease_id = keyvalue.lease
        self.response_header = header


class Status(object):
    def __init__(self, version, db_size, leader, raft_index, raft_term):
        self.version = version
        self.db_size = db_size
        self.leader = leader
        self.raft_index = raft_index
        self.raft_term = raft_term


class Alarm(object):
    def __init__(self, alarm_type, member_id):
        self.alarm_type = alarm_type
        self.member_id = member_id


class EtcdTokenCallCredentials(grpc.AuthMetadataPlugin):
    """Metadata wrapper for raw access token credentials."""

    def __init__(self, access_token):
        self._access_token = access_token

    def __call__(self, context, callback):
        metadata = (('token', self._access_token),)
        callback(metadata, None)


[docs]class Etcd3Client(object): def __init__(self, host='localhost', port=2379, ca_cert=None, cert_key=None, cert_cert=None, timeout=None, user=None, password=None, grpc_options=None): self._url = '{host}:{port}'.format(host=host, port=port) self.metadata = None cert_params = [c is not None for c in (cert_cert, cert_key)] if ca_cert is not None: if all(cert_params): credentials = self._get_secure_creds( ca_cert, cert_key, cert_cert ) self.uses_secure_channel = True self.channel = grpc.secure_channel(self._url, credentials, options=grpc_options) elif any(cert_params): # some of the cert parameters are set raise ValueError( 'to use a secure channel ca_cert is required by itself, ' 'or cert_cert and cert_key must both be specified.') else: credentials = self._get_secure_creds(ca_cert, None, None) self.uses_secure_channel = True self.channel = grpc.secure_channel(self._url, credentials, options=grpc_options) else: self.uses_secure_channel = False self.channel = grpc.insecure_channel(self._url, options=grpc_options) self.timeout = timeout self.call_credentials = None cred_params = [c is not None for c in (user, password)] if all(cred_params): self.auth_stub = etcdrpc.AuthStub(self.channel) auth_request = etcdrpc.AuthenticateRequest( name=user, password=password ) resp = self.auth_stub.Authenticate(auth_request, self.timeout) self.metadata = (('token', resp.token),) self.call_credentials = grpc.metadata_call_credentials( EtcdTokenCallCredentials(resp.token)) elif any(cred_params): raise Exception( 'if using authentication credentials both user and password ' 'must be specified.' ) self.kvstub = etcdrpc.KVStub(self.channel) self.watcher = watch.Watcher( etcdrpc.WatchStub(self.channel), timeout=self.timeout, call_credentials=self.call_credentials, metadata=self.metadata ) self.clusterstub = etcdrpc.ClusterStub(self.channel) self.leasestub = etcdrpc.LeaseStub(self.channel) self.maintenancestub = etcdrpc.MaintenanceStub(self.channel) self.transactions = Transactions()
[docs] def close(self): """Call the GRPC channel close semantics.""" self.channel.close()
def __enter__(self): return self def __exit__(self, *args): self.close() def __del__(self): self.close() def _get_secure_creds(self, ca_cert, cert_key=None, cert_cert=None): cert_key_file = None cert_cert_file = None with open(ca_cert, 'rb') as f: ca_cert_file = f.read() if cert_key is not None: with open(cert_key, 'rb') as f: cert_key_file = f.read() if cert_cert is not None: with open(cert_cert, 'rb') as f: cert_cert_file = f.read() return grpc.ssl_channel_credentials( ca_cert_file, cert_key_file, cert_cert_file ) def _build_get_range_request(self, key, range_end=None, limit=None, revision=None, sort_order=None, sort_target='key', serializable=None, keys_only=None, count_only=None, min_mod_revision=None, max_mod_revision=None, min_create_revision=None, max_create_revision=None): range_request = etcdrpc.RangeRequest() range_request.key = utils.to_bytes(key) if range_end is not None: range_request.range_end = utils.to_bytes(range_end) if sort_order is None: range_request.sort_order = etcdrpc.RangeRequest.NONE elif sort_order == 'ascend': range_request.sort_order = etcdrpc.RangeRequest.ASCEND elif sort_order == 'descend': range_request.sort_order = etcdrpc.RangeRequest.DESCEND else: raise ValueError('unknown sort order: "{}"'.format(sort_order)) if sort_target is None or sort_target == 'key': range_request.sort_target = etcdrpc.RangeRequest.KEY elif sort_target == 'version': range_request.sort_target = etcdrpc.RangeRequest.VERSION elif sort_target == 'create': range_request.sort_target = etcdrpc.RangeRequest.CREATE elif sort_target == 'mod': range_request.sort_target = etcdrpc.RangeRequest.MOD elif sort_target == 'value': range_request.sort_target = etcdrpc.RangeRequest.VALUE else: raise ValueError('sort_target must be one of "key", ' '"version", "create", "mod" or "value"') return range_request
[docs] @_handle_errors def get(self, key): """ Get the value of a key from etcd. example usage: .. code-block:: python >>> import etcd3 >>> etcd = etcd3.client() >>> etcd.get('/thing/key') 'hello world' :param key: key in etcd to get :returns: value of key and metadata :rtype: bytes, ``KVMetadata`` """ range_request = self._build_get_range_request(key) range_response = self.kvstub.Range( range_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) if range_response.count < 1: return None, None else: kv = range_response.kvs.pop() return kv.value, KVMetadata(kv, range_response.header)
[docs] @_handle_errors def get_prefix(self, key_prefix, sort_order=None, sort_target='key'): """ Get a range of keys with a prefix. :param key_prefix: first key in range :returns: sequence of (value, metadata) tuples """ range_request = self._build_get_range_request( key=key_prefix, range_end=utils.increment_last_byte(utils.to_bytes(key_prefix)), sort_order=sort_order, sort_target=sort_target, ) range_response = self.kvstub.Range( range_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) if range_response.count < 1: return else: for kv in range_response.kvs: yield (kv.value, KVMetadata(kv, range_response.header))
[docs] @_handle_errors def get_range(self, range_start, range_end, sort_order=None, sort_target='key', **kwargs): """ Get a range of keys. :param range_start: first key in range :param range_end: last key in range :returns: sequence of (value, metadata) tuples """ range_request = self._build_get_range_request( key=range_start, range_end=range_end, sort_order=sort_order, sort_target=sort_target, **kwargs ) range_response = self.kvstub.Range( range_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) if range_response.count < 1: return else: for kv in range_response.kvs: yield (kv.value, KVMetadata(kv, range_response.header))
[docs] @_handle_errors def get_all(self, sort_order=None, sort_target='key'): """ Get all keys currently stored in etcd. :returns: sequence of (value, metadata) tuples """ range_request = self._build_get_range_request( key=b'\0', range_end=b'\0', sort_order=sort_order, sort_target=sort_target, ) range_response = self.kvstub.Range( range_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) if range_response.count < 1: return else: for kv in range_response.kvs: yield (kv.value, KVMetadata(kv, range_response.header))
def _build_put_request(self, key, value, lease=None, prev_kv=False): put_request = etcdrpc.PutRequest() put_request.key = utils.to_bytes(key) put_request.value = utils.to_bytes(value) put_request.lease = utils.lease_to_id(lease) put_request.prev_kv = prev_kv return put_request
[docs] @_handle_errors def put(self, key, value, lease=None, prev_kv=False): """ Save a value to etcd. Example usage: .. code-block:: python >>> import etcd3 >>> etcd = etcd3.client() >>> etcd.put('/thing/key', 'hello world') :param key: key in etcd to set :param value: value to set key to :type value: bytes :param lease: Lease to associate with this key. :type lease: either :class:`.Lease`, or int (ID of lease) :param prev_kv: return the previous key-value pair :type prev_kv: bool :returns: a response containing a header and the prev_kv :rtype: :class:`.rpc_pb2.PutResponse` """ put_request = self._build_put_request(key, value, lease=lease, prev_kv=prev_kv) return self.kvstub.Put( put_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
[docs] @_handle_errors def replace(self, key, initial_value, new_value): """ Atomically replace the value of a key with a new value. This compares the current value of a key, then replaces it with a new value if it is equal to a specified value. This operation takes place in a transaction. :param key: key in etcd to replace :param initial_value: old value to replace :type initial_value: bytes :param new_value: new value of the key :type new_value: bytes :returns: status of transaction, ``True`` if the replace was successful, ``False`` otherwise :rtype: bool """ status, _ = self.transaction( compare=[self.transactions.value(key) == initial_value], success=[self.transactions.put(key, new_value)], failure=[], ) return status
def _build_delete_request(self, key, range_end=None, prev_kv=False): delete_request = etcdrpc.DeleteRangeRequest() delete_request.key = utils.to_bytes(key) delete_request.prev_kv = prev_kv if range_end is not None: delete_request.range_end = utils.to_bytes(range_end) return delete_request
[docs] @_handle_errors def delete(self, key, prev_kv=False, return_response=False): """ Delete a single key in etcd. :param key: key in etcd to delete :param prev_kv: return the deleted key-value pair :type prev_kv: bool :param return_response: return the full response :type return_response: bool :returns: True if the key has been deleted when ``return_response`` is False and a response containing a header, the number of deleted keys and prev_kvs when ``return_response`` is True """ delete_request = self._build_delete_request(key, prev_kv=prev_kv) delete_response = self.kvstub.DeleteRange( delete_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) if return_response: return delete_response return delete_response.deleted >= 1
[docs] @_handle_errors def delete_prefix(self, prefix): """Delete a range of keys with a prefix in etcd.""" delete_request = self._build_delete_request( prefix, range_end=utils.increment_last_byte(utils.to_bytes(prefix)) ) return self.kvstub.DeleteRange( delete_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
[docs] @_handle_errors def status(self): """Get the status of the responding member.""" status_request = etcdrpc.StatusRequest() status_response = self.maintenancestub.Status( status_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) for m in self.members: if m.id == status_response.leader: leader = m break else: # raise exception? leader = None return Status(status_response.version, status_response.dbSize, leader, status_response.raftIndex, status_response.raftTerm)
[docs] @_handle_errors def add_watch_callback(self, *args, **kwargs): """ Watch a key or range of keys and call a callback on every event. If timeout was declared during the client initialization and the watch cannot be created during that time the method raises a ``WatchTimedOut`` exception. :param key: key to watch :param callback: callback function :returns: watch_id. Later it could be used for cancelling watch. """ try: return self.watcher.add_callback(*args, **kwargs) except queue.Empty: raise exceptions.WatchTimedOut()
[docs] @_handle_errors def watch(self, key, **kwargs): """ Watch a key. Example usage: .. code-block:: python events_iterator, cancel = etcd.watch('/doot/key') for event in events_iterator: print(event) :param key: key to watch :returns: tuple of ``events_iterator`` and ``cancel``. Use ``events_iterator`` to get the events of key changes and ``cancel`` to cancel the watch request """ event_queue = queue.Queue() def callback(event): event_queue.put(event) watch_id = self.add_watch_callback(key, callback, **kwargs) canceled = threading.Event() def cancel(): canceled.set() event_queue.put(None) self.cancel_watch(watch_id) @_handle_errors def iterator(): while not canceled.is_set(): event = event_queue.get() if event is None: canceled.set() if isinstance(event, Exception): canceled.set() raise event if not canceled.is_set(): yield event return iterator(), cancel
[docs] @_handle_errors def watch_prefix(self, key_prefix, **kwargs): """Watches a range of keys with a prefix.""" kwargs['range_end'] = \ utils.increment_last_byte(utils.to_bytes(key_prefix)) return self.watch(key_prefix, **kwargs)
[docs] @_handle_errors def watch_once(self, key, timeout=None, **kwargs): """ Watch a key and stops after the first event. If the timeout was specified and event didn't arrived method will raise ``WatchTimedOut`` exception. :param key: key to watch :param timeout: (optional) timeout in seconds. :returns: ``Event`` """ event_queue = queue.Queue() def callback(event): event_queue.put(event) watch_id = self.add_watch_callback(key, callback, **kwargs) try: return event_queue.get(timeout=timeout) except queue.Empty: raise exceptions.WatchTimedOut() finally: self.cancel_watch(watch_id)
[docs] @_handle_errors def watch_prefix_once(self, key_prefix, timeout=None, **kwargs): """ Watches a range of keys with a prefix and stops after the first event. If the timeout was specified and event didn't arrived method will raise ``WatchTimedOut`` exception. """ kwargs['range_end'] = \ utils.increment_last_byte(utils.to_bytes(key_prefix)) return self.watch_once(key_prefix, timeout=timeout, **kwargs)
[docs] @_handle_errors def cancel_watch(self, watch_id): """ Stop watching a key or range of keys. :param watch_id: watch_id returned by ``add_watch_callback`` method """ self.watcher.cancel(watch_id)
def _ops_to_requests(self, ops): """ Return a list of grpc requests. Returns list from an input list of etcd3.transactions.{Put, Get, Delete, Txn} objects. """ request_ops = [] for op in ops: if isinstance(op, transactions.Put): request = self._build_put_request(op.key, op.value, op.lease, op.prev_kv) request_op = etcdrpc.RequestOp(request_put=request) request_ops.append(request_op) elif isinstance(op, transactions.Get): request = self._build_get_range_request(op.key, op.range_end) request_op = etcdrpc.RequestOp(request_range=request) request_ops.append(request_op) elif isinstance(op, transactions.Delete): request = self._build_delete_request(op.key, op.range_end, op.prev_kv) request_op = etcdrpc.RequestOp(request_delete_range=request) request_ops.append(request_op) elif isinstance(op, transactions.Txn): compare = [c.build_message() for c in op.compare] success_ops = self._ops_to_requests(op.success) failure_ops = self._ops_to_requests(op.failure) request = etcdrpc.TxnRequest(compare=compare, success=success_ops, failure=failure_ops) request_op = etcdrpc.RequestOp(request_txn=request) request_ops.append(request_op) else: raise Exception( 'Unknown request class {}'.format(op.__class__)) return request_ops
[docs] @_handle_errors def transaction(self, compare, success=None, failure=None): """ Perform a transaction. Example usage: .. code-block:: python etcd.transaction( compare=[ etcd.transactions.value('/doot/testing') == 'doot', etcd.transactions.version('/doot/testing') > 0, ], success=[ etcd.transactions.put('/doot/testing', 'success'), ], failure=[ etcd.transactions.put('/doot/testing', 'failure'), ] ) :param compare: A list of comparisons to make :param success: A list of operations to perform if all the comparisons are true :param failure: A list of operations to perform if any of the comparisons are false :return: A tuple of (operation status, responses) """ compare = [c.build_message() for c in compare] success_ops = self._ops_to_requests(success) failure_ops = self._ops_to_requests(failure) transaction_request = etcdrpc.TxnRequest(compare=compare, success=success_ops, failure=failure_ops) txn_response = self.kvstub.Txn( transaction_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) responses = [] for response in txn_response.responses: response_type = response.WhichOneof('response') if response_type in ['response_put', 'response_delete_range', 'response_txn']: responses.append(response) elif response_type == 'response_range': range_kvs = [] for kv in response.response_range.kvs: range_kvs.append((kv.value, KVMetadata(kv, txn_response.header))) responses.append(range_kvs) return txn_response.succeeded, responses
[docs] @_handle_errors def lease(self, ttl, lease_id=None): """ Create a new lease. All keys attached to this lease will be expired and deleted if the lease expires. A lease can be sent keep alive messages to refresh the ttl. :param ttl: Requested time to live :param lease_id: Requested ID for the lease :returns: new lease :rtype: :class:`.Lease` """ lease_grant_request = etcdrpc.LeaseGrantRequest(TTL=ttl, ID=lease_id) lease_grant_response = self.leasestub.LeaseGrant( lease_grant_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) return leases.Lease(lease_id=lease_grant_response.ID, ttl=lease_grant_response.TTL, etcd_client=self)
[docs] @_handle_errors def revoke_lease(self, lease_id): """ Revoke a lease. :param lease_id: ID of the lease to revoke. """ lease_revoke_request = etcdrpc.LeaseRevokeRequest(ID=lease_id) self.leasestub.LeaseRevoke( lease_revoke_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
@_handle_errors def refresh_lease(self, lease_id): keep_alive_request = etcdrpc.LeaseKeepAliveRequest(ID=lease_id) request_stream = [keep_alive_request] for response in self.leasestub.LeaseKeepAlive( iter(request_stream), self.timeout, credentials=self.call_credentials, metadata=self.metadata): yield response @_handle_errors def get_lease_info(self, lease_id): # only available in etcd v3.1.0 and later ttl_request = etcdrpc.LeaseTimeToLiveRequest(ID=lease_id, keys=True) return self.leasestub.LeaseTimeToLive( ttl_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
[docs] @_handle_errors def lock(self, name, ttl=60): """ Create a new lock. :param name: name of the lock :type name: string or bytes :param ttl: length of time for the lock to live for in seconds. The lock will be released after this time elapses, unless refreshed :type ttl: int :returns: new lock :rtype: :class:`.Lock` """ return locks.Lock(name, ttl=ttl, etcd_client=self)
[docs] @_handle_errors def add_member(self, urls): """ Add a member into the cluster. :returns: new member :rtype: :class:`.Member` """ member_add_request = etcdrpc.MemberAddRequest(peerURLs=urls) member_add_response = self.clusterstub.MemberAdd( member_add_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) member = member_add_response.member return etcd3.members.Member(member.ID, member.name, member.peerURLs, member.clientURLs, etcd_client=self)
[docs] @_handle_errors def remove_member(self, member_id): """ Remove an existing member from the cluster. :param member_id: ID of the member to remove """ member_rm_request = etcdrpc.MemberRemoveRequest(ID=member_id) self.clusterstub.MemberRemove( member_rm_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
[docs] @_handle_errors def update_member(self, member_id, peer_urls): """ Update the configuration of an existing member in the cluster. :param member_id: ID of the member to update :param peer_urls: new list of peer urls the member will use to communicate with the cluster """ member_update_request = etcdrpc.MemberUpdateRequest(ID=member_id, peerURLs=peer_urls) self.clusterstub.MemberUpdate( member_update_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
@property def members(self): """ List of all members associated with the cluster. :type: sequence of :class:`.Member` """ member_list_request = etcdrpc.MemberListRequest() member_list_response = self.clusterstub.MemberList( member_list_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) for member in member_list_response.members: yield etcd3.members.Member(member.ID, member.name, member.peerURLs, member.clientURLs, etcd_client=self)
[docs] @_handle_errors def compact(self, revision, physical=False): """ Compact the event history in etcd up to a given revision. All superseded keys with a revision less than the compaction revision will be removed. :param revision: revision for the compaction operation :param physical: if set to True, the request will wait until the compaction is physically applied to the local database such that compacted entries are totally removed from the backend database """ compact_request = etcdrpc.CompactionRequest(revision=revision, physical=physical) self.kvstub.Compact( compact_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
[docs] @_handle_errors def defragment(self): """Defragment a member's backend database to recover storage space.""" defrag_request = etcdrpc.DefragmentRequest() self.maintenancestub.Defragment( defrag_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata )
[docs] @_handle_errors def hash(self): """ Return the hash of the local KV state. :returns: kv state hash :rtype: int """ hash_request = etcdrpc.HashRequest() return self.maintenancestub.Hash(hash_request).hash
def _build_alarm_request(self, alarm_action, member_id, alarm_type): alarm_request = etcdrpc.AlarmRequest() if alarm_action == 'get': alarm_request.action = etcdrpc.AlarmRequest.GET elif alarm_action == 'activate': alarm_request.action = etcdrpc.AlarmRequest.ACTIVATE elif alarm_action == 'deactivate': alarm_request.action = etcdrpc.AlarmRequest.DEACTIVATE else: raise ValueError('Unknown alarm action: {}'.format(alarm_action)) alarm_request.memberID = member_id if alarm_type == 'none': alarm_request.alarm = etcdrpc.NONE elif alarm_type == 'no space': alarm_request.alarm = etcdrpc.NOSPACE else: raise ValueError('Unknown alarm type: {}'.format(alarm_type)) return alarm_request
[docs] @_handle_errors def create_alarm(self, member_id=0): """Create an alarm. If no member id is given, the alarm is activated for all the members of the cluster. Only the `no space` alarm can be raised. :param member_id: The cluster member id to create an alarm to. If 0, the alarm is created for all the members of the cluster. :returns: list of :class:`.Alarm` """ alarm_request = self._build_alarm_request('activate', member_id, 'no space') alarm_response = self.maintenancestub.Alarm( alarm_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) return [Alarm(alarm.alarm, alarm.memberID) for alarm in alarm_response.alarms]
[docs] @_handle_errors def list_alarms(self, member_id=0, alarm_type='none'): """List the activated alarms. :param member_id: :param alarm_type: The cluster member id to create an alarm to. If 0, the alarm is created for all the members of the cluster. :returns: sequence of :class:`.Alarm` """ alarm_request = self._build_alarm_request('get', member_id, alarm_type) alarm_response = self.maintenancestub.Alarm( alarm_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) for alarm in alarm_response.alarms: yield Alarm(alarm.alarm, alarm.memberID)
[docs] @_handle_errors def disarm_alarm(self, member_id=0): """Cancel an alarm. :param member_id: The cluster member id to cancel an alarm. If 0, the alarm is canceled for all the members of the cluster. :returns: List of :class:`.Alarm` """ alarm_request = self._build_alarm_request('deactivate', member_id, 'no space') alarm_response = self.maintenancestub.Alarm( alarm_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) return [Alarm(alarm.alarm, alarm.memberID) for alarm in alarm_response.alarms]
[docs] @_handle_errors def snapshot(self, file_obj): """Take a snapshot of the database. :param file_obj: A file-like object to write the database contents in. """ snapshot_request = etcdrpc.SnapshotRequest() snapshot_response = self.maintenancestub.Snapshot( snapshot_request, self.timeout, credentials=self.call_credentials, metadata=self.metadata ) for response in snapshot_response: file_obj.write(response.blob)
def client(host='localhost', port=2379, ca_cert=None, cert_key=None, cert_cert=None, timeout=None, user=None, password=None, grpc_options=None): """Return an instance of an Etcd3Client.""" return Etcd3Client(host=host, port=port, ca_cert=ca_cert, cert_key=cert_key, cert_cert=cert_cert, timeout=timeout, user=user, password=password, grpc_options=grpc_options)