Source code for brewtils.rest.easy_client

# -*- coding: utf-8 -*-

import logging
import warnings

import requests.exceptions

from brewtils.errors import (
    FetchError, ValidationError, SaveError, DeleteError, RestConnectionError,
    NotFoundError, ConflictError, RestError, WaitExceededError)
from brewtils.models import Event, PatchOperation
from brewtils.rest.client import RestClient
from brewtils.schema_parser import SchemaParser


[docs]class EasyClient(object): """Client for communicating with beer-garden This class provides nice wrappers around the functionality provided by a :py:class:`brewtils.rest.client.RestClient` :param bg_host: beer-garden REST API hostname. :param bg_port: beer-garden REST API port. :param ssl_enabled: Flag indicating whether to use HTTPS when communicating with beer-garden. :param api_version: The beer-garden REST API version. Will default to the latest version. :param ca_cert: beer-garden REST API server CA certificate. :param client_cert: The client certificate to use when making requests. :param parser: The parser to use. If None will default to an instance of SchemaParser. :param logger: The logger to use. If None one will be created. :param url_prefix: beer-garden REST API URL Prefix. :param ca_verify: Flag indicating whether to verify server certificate when making a request. :param username: Username for Beergarden authentication :param password: Password for Beergarden authentication :param access_token: Access token for Beergarden authentication :param refresh_token: Refresh token for Beergarden authentication :param client_timeout: Max time to will wait for server response """ def __init__( self, bg_host=None, bg_port=None, ssl_enabled=False, api_version=None, ca_cert=None, client_cert=None, parser=None, logger=None, url_prefix=None, ca_verify=True, **kwargs ): bg_host = bg_host or kwargs.get('host') bg_port = bg_port or kwargs.get('port') self.logger = logger or logging.getLogger(__name__) self.parser = parser or SchemaParser() self.client = RestClient( bg_host=bg_host, bg_port=bg_port, ssl_enabled=ssl_enabled, api_version=api_version, ca_cert=ca_cert, client_cert=client_cert, url_prefix=url_prefix, ca_verify=ca_verify, **kwargs )
[docs] def can_connect(self, **kwargs): """Determine if the Beergarden server is responding. Kwargs: Arguments passed to the underlying Requests method Returns: A bool indicating if the connection attempt was successful. Will return False only if a ConnectionError is raised during the attempt. Any other exception will be re-raised. Raises: requests.exceptions.RequestException: The connection attempt resulted in an exception that indicates something other than a basic connection error. For example, an error with certificate verification. """ try: self.client.get_config(**kwargs) except requests.exceptions.ConnectionError as ex: if type(ex) == requests.exceptions.ConnectionError: return False raise return True
[docs] def get_version(self, **kwargs): response = self.client.get_version(**kwargs) if response.ok: return response else: self._handle_response_failure(response, default_exc=FetchError)
[docs] def find_unique_system(self, **kwargs): """Find a unique system using keyword arguments as search parameters :param kwargs: Search parameters :return: One system instance """ if 'id' in kwargs: return self._find_system_by_id(kwargs.pop('id'), **kwargs) else: systems = self.find_systems(**kwargs) if not systems: return None if len(systems) > 1: raise FetchError("More than one system found that specifies the given constraints") return systems[0]
[docs] def find_systems(self, **kwargs): """Find systems using keyword arguments as search parameters :param kwargs: Search parameters :return: A list of system instances satisfying the given search parameters """ response = self.client.get_systems(**kwargs) if response.ok: return self.parser.parse_system(response.json(), many=True) else: self._handle_response_failure(response, default_exc=FetchError)
def _find_system_by_id(self, system_id, **kwargs): """Finds a system by id, convert JSON to a system object and return it""" response = self.client.get_system(system_id, **kwargs) if response.ok: return self.parser.parse_system(response.json()) else: self._handle_response_failure(response, default_exc=FetchError, raise_404=False)
[docs] def create_system(self, system): """Create a new system by POSTing :param system: The system to create :return: The system creation response """ json_system = self.parser.serialize_system(system) response = self.client.post_systems(json_system) if response.ok: return self.parser.parse_system(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def update_system(self, system_id, new_commands=None, **kwargs): """Update a system by PATCHing :param system_id: The ID of the system to update :param new_commands: The new commands :Keyword Arguments: * *metadata* (``dict``) The updated metadata for the system * *description* (``str``) The updated description for the system * *display_name* (``str``) The updated display_name for the system * *icon_name* (``str``) The updated icon_name for the system :return: The response """ operations = [] metadata = kwargs.pop("metadata", {}) if new_commands: operations.append(PatchOperation('replace', '/commands', self.parser.serialize_command(new_commands, to_string=False, many=True))) if metadata: operations.append(PatchOperation('update', '/metadata', metadata)) for attr, value in kwargs.items(): if value is not None: operations.append(PatchOperation('replace', '/%s' % attr, value)) response = self.client.patch_system(system_id, self.parser.serialize_patch(operations, many=True)) if response.ok: return self.parser.parse_system(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def remove_system(self, **kwargs): """Remove a specific system by DELETEing, using keyword arguments as search parameters :param kwargs: Search parameters :return: The response """ system = self.find_unique_system(**kwargs) if system is None: raise FetchError("Could not find system matching the given search parameters") return self._remove_system_by_id(system.id)
def _remove_system_by_id(self, system_id): if system_id is None: raise DeleteError("Cannot delete a system without an id") response = self.client.delete_system(system_id) if response.ok: return True else: self._handle_response_failure(response, default_exc=DeleteError)
[docs] def initialize_instance(self, instance_id): """Start an instance by PATCHing :param instance_id: The ID of the instance to start :return: The start response """ response = self.client.patch_instance(instance_id, self.parser.serialize_patch( PatchOperation('initialize') )) if response.ok: return self.parser.parse_instance(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def get_instance_status(self, instance_id): """Get an instance's status Args: instance_id: The Id Returns: The status """ response = self.client.get_instance(instance_id) if response.ok: return self.parser.parse_instance(response.json()) else: self._handle_response_failure(response, default_exc=FetchError)
[docs] def update_instance_status(self, instance_id, new_status): """Update an instance by PATCHing :param instance_id: The ID of the instance to start :param new_status: The updated status :return: The start response """ payload = PatchOperation('replace', '/status', new_status) response = self.client.patch_instance( instance_id, self.parser.serialize_patch(payload)) if response.ok: return self.parser.parse_instance(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def instance_heartbeat(self, instance_id): """Send heartbeat for health and status :param instance_id: The ID of the instance :return: The response """ payload = PatchOperation('heartbeat') response = self.client.patch_instance( instance_id, self.parser.serialize_patch(payload)) if response.ok: return True else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def remove_instance(self, instance_id): """Remove an instance :param instance_id: The ID of the instance :return: The response """ if instance_id is None: raise DeleteError("Cannot delete an instance without an id") response = self.client.delete_instance(instance_id) if response.ok: return True else: self._handle_response_failure(response, default_exc=DeleteError)
[docs] def find_unique_request(self, **kwargs): """Find a unique request using keyword arguments as search parameters .. note:: If 'id' is present in kwargs then all other parameters will be ignored. :param kwargs: Search parameters :return: One request instance """ if 'id' in kwargs: return self._find_request_by_id(kwargs.pop('id')) else: requests = self.find_requests(**kwargs) if not requests: return None if len(requests) > 1: raise FetchError("More than one request found that specifies " "the given constraints") return requests[0]
[docs] def find_requests(self, **kwargs): """Find requests using keyword arguments as search parameters :param kwargs: Search parameters :return: A list of request instances satisfying the given search parameters """ response = self.client.get_requests(**kwargs) if response.ok: return self.parser.parse_request(response.json(), many=True) else: self._handle_response_failure(response, default_exc=FetchError)
def _find_request_by_id(self, request_id): """Finds a request by id, convert JSON to a request object and return it""" response = self.client.get_request(request_id) if response.ok: return self.parser.parse_request(response.json()) else: self._handle_response_failure(response, default_exc=FetchError, raise_404=False)
[docs] def create_request(self, request, **kwargs): """Create a new request by POSTing Args: request: New request definition kwargs: Extra request parameters Keyword Args: blocking: Wait for request to complete timeout: Maximum seconds to wait Returns: Response to the request """ json_request = self.parser.serialize_request(request) response = self.client.post_requests(json_request, **kwargs) if response.ok: return self.parser.parse_request(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def update_request(self, request_id, status=None, output=None, error_class=None): """Set various fields on a request by PATCHing :param request_id: The ID of the request to update :param status: The new status :param output: The new output :param error_class: The new error class :return: The response """ operations = [] if status: operations.append(PatchOperation('replace', '/status', status)) if output: operations.append(PatchOperation('replace', '/output', output)) if error_class: operations.append(PatchOperation('replace', '/error_class', error_class)) response = self.client.patch_request(request_id, self.parser.serialize_patch(operations, many=True)) if response.ok: return self.parser.parse_request(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def get_logging_config(self, system_name): """Get the logging configuration for a particular system. :param system_name: Name of system :return: LoggingConfig object """ response = self.client.get_logging_config(system_name=system_name) if response.ok: return self.parser.parse_logging_config(response.json()) else: self._handle_response_failure(response, default_exc=RestConnectionError)
[docs] def publish_event(self, *args, **kwargs): """Publish a new event by POSTing :param args: The Event to create :param _publishers: Optional list of specific publishers. If None all publishers will be used. :param kwargs: If no Event is given in the *args, on will be constructed from the kwargs :return: The response """ publishers = kwargs.pop('_publishers', None) json_event = self.parser.serialize_event(args[0] if args else Event(**kwargs)) response = self.client.post_event(json_event, publishers=publishers) if response.ok: return True else: self._handle_response_failure(response)
[docs] def get_queues(self): """Retrieve all queue information :return: The response """ response = self.client.get_queues() if response.ok: return self.parser.parse_queue(response.json(), many=True) else: self._handle_response_failure(response)
[docs] def clear_queue(self, queue_name): """Cancel and clear all messages from a queue :return: The response """ response = self.client.delete_queue(queue_name) if response.ok: return True else: self._handle_response_failure(response)
[docs] def clear_all_queues(self): """Cancel and clear all messages from all queues :return: The response """ response = self.client.delete_queues() if response.ok: return True else: self._handle_response_failure(response)
[docs] def find_jobs(self, **kwargs): """Find jobs using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List of jobs. """ response = self.client.get_jobs(**kwargs) if response.ok: return self.parser.parse_job(response.json(), many=True) else: self._handle_response_failure(response, default_exc=FetchError)
[docs] def create_job(self, job): """Create a new job by POSTing Args: job: The job to create Returns: The job creation response. """ json_job = self.parser.serialize_job(job) response = self.client.post_jobs(json_job) if response.ok: return self.parser.parse_job(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def remove_job(self, job_id): """Remove a job by ID. Args: job_id: The ID of the job to remove. Returns: True if successful, raises an error otherwise. """ response = self.client.delete_job(job_id) if response.ok: return True else: self._handle_response_failure(response, default_exc=DeleteError)
[docs] def pause_job(self, job_id): """Pause a Job by ID. Args: job_id: The ID of the job to pause. Returns: A copy of the job. """ self._patch_job(job_id, [PatchOperation('update', '/status', 'PAUSED')])
def _patch_job(self, job_id, operations): response = self.client.patch_job( job_id, self.parser.serialize_patch(operations, many=True) ) if response.ok: return self.parser.parse_job(response.json()) else: self._handle_response_failure(response, default_exc=SaveError)
[docs] def resume_job(self, job_id): """Resume a job by ID. Args: job_id: The ID of the job to resume. Returns: A copy of the job. """ self._patch_job(job_id, [PatchOperation('update', '/status', 'RUNNING')])
[docs] def who_am_i(self): """Find the user represented by the current set of credentials :return: The current user """ return self.get_user(self.client.username or 'anonymous')
[docs] def get_user(self, user_identifier): """Find a specific user using username or ID :param user_identifier: ID or username of User :return: A User """ response = self.client.get_user(user_identifier) if response.ok: return self.parser.parse_principal(response.json()) else: self._handle_response_failure(response, default_exc=FetchError)
@staticmethod def _handle_response_failure(response, default_exc=RestError, raise_404=True): if response.status_code == 404: if raise_404: raise NotFoundError(response.json()) else: return None elif response.status_code == 408: raise WaitExceededError(response.json()) elif response.status_code == 409: raise ConflictError(response.json()) elif 400 <= response.status_code < 500: raise ValidationError(response.json()) elif response.status_code == 503: raise RestConnectionError(response.json()) else: raise default_exc(response.json())
[docs]class BrewmasterEasyClient(EasyClient): def __init__(self, *args, **kwargs): warnings.warn("Call made to 'BrewmasterEasyClient'. This name will be removed in version " "3.0, please use " "'EasyClient' instead.", DeprecationWarning, stacklevel=2) super(BrewmasterEasyClient, self).__init__(*args, **kwargs)