Source code for brewtils.rest.easy_client

# -*- coding: utf-8 -*-
import logging
import warnings

import requests.exceptions
import wrapt

from brewtils.errors import (
    FetchError,
    ValidationError,
    SaveError,
    DeleteError,
    RestConnectionError,
    NotFoundError,
    ConflictError,
    RestError,
    WaitExceededError,
)
from brewtils.models import Event, PatchOperation
from brewtils.rest.client import RestClient
from brewtils.schema_parser import SchemaParser


[docs]def handle_response_failure(response, default_exc=RestError, raise_404=True): """Deal with a response with non-2xx status code Args: response: The response object default_exc: The exception to raise if no specific exception is warranted raise_404: If True a response with status code 404 will raise a NotFoundError. If False the method will return None. Returns: None - this function will always raise Raises: NotFoundError: Status code 404 and raise_404 is True WaitExceededError: Status code 408 ConflictError: Status code 409 ValidationError: Any other 4xx status codes RestConnectionError: Status code 503 default_exc: Any other status code """ if response.status_code == 404: if raise_404: raise NotFoundError(response.json()) else: return None elif response.status_code == 408: raise WaitExceededError(response.json()) elif response.status_code == 409: raise ConflictError(response.json()) elif 400 <= response.status_code < 500: raise ValidationError(response.json()) elif response.status_code == 503: raise RestConnectionError(response.json()) else: raise default_exc(response.json())
[docs]def wrap_response( return_boolean=False, parse_method="", parse_many=False, default_exc=RestError, raise_404=True, ): """Decorator to consolidate response parsing and error handling Args: return_boolean: If True, a successful response will also return True parse_method: The response's json will be passed to this method of the SchemaParser parse_many: This will be passed as the 'many' parameter when parsing the response default_exc: Will be passed to handle_response_failure for failed responses raise_404: Will be passed to handle_response_failure for failed responses Returns: - True if return_boolean is True and the response status code is 2xx. - The response object if return_boolean is False and parse_method is "" - A parsed Brewtils model if return_boolean is False and parse_method is defined Raises: RestError: The response has a non-2xx status code. Note that the specific exception raised depends on the response status code and the argument passed as the default_exc parameter. """ @wrapt.decorator def wrapper(wrapped, instance, args, kwargs): response = wrapped(*args, **kwargs) if response.ok: if return_boolean: return True if not hasattr(instance.parser, parse_method): return response return getattr(instance.parser, parse_method)( response.json(), many=parse_many ) else: handle_response_failure( response, default_exc=default_exc, raise_404=raise_404 ) return wrapper
[docs]class EasyClient(object): """Client for simplified communication with Beergarden This class is intended to be a middle ground between the RestClient and SystemClient. It provides a 'cleaner' interface to some common Beergarden operations than is exposed by the lower-level RestClient. On the other hand, the SystemClient is much better for generating Beergarden Requests. Keyword Args: bg_host (str): Beergarden hostname bg_port (int): Beergarden port ssl_enabled (Optional[bool]): Whether to use SSL (HTTP vs HTTPS) api_version (Optional[int]): The REST API version ca_cert (Optional[str]): Path to CA certificate file client_cert (Optional[str]): Path to client certificate file parser (Optional[SchemaParser]): Parser to use logger (Optional[Logger]): Logger to use url_prefix (Optional[str]): Beergarden REST API prefix ca_verify (Optional[bool]): Whether to verify the server cert hostname username (Optional[str]): Username for authentication password (Optional[str]): Password for authentication access_token (Optional[str]): Access token for authentication refresh_token (Optional[str]): Refresh token for authentication client_timeout (Optional[float]): Max time to wait for a server response """ def __init__( self, bg_host=None, bg_port=None, ssl_enabled=False, api_version=None, ca_cert=None, client_cert=None, parser=None, logger=None, url_prefix=None, ca_verify=True, **kwargs ): bg_host = bg_host or kwargs.get("host") bg_port = bg_port or kwargs.get("port") self.logger = logger or logging.getLogger(__name__) self.parser = parser or SchemaParser() self.client = RestClient( bg_host=bg_host, bg_port=bg_port, ssl_enabled=ssl_enabled, api_version=api_version, ca_cert=ca_cert, client_cert=client_cert, url_prefix=url_prefix, ca_verify=ca_verify, **kwargs )
[docs] def can_connect(self, **kwargs): """Determine if the Beergarden server is responding. Kwargs: Arguments passed to the underlying Requests method Returns: A bool indicating if the connection attempt was successful. Will return False only if a ConnectionError is raised during the attempt. Any other exception will be re-raised. Raises: requests.exceptions.RequestException: The connection attempt resulted in an exception that indicates something other than a basic connection error. For example, an error with certificate verification. """ try: self.client.get_config(**kwargs) except requests.exceptions.ConnectionError as ex: if type(ex) == requests.exceptions.ConnectionError: return False raise return True
[docs] @wrap_response(default_exc=FetchError) def get_version(self, **kwargs): """Get Bartender, Brew-view, and API version information Args: **kwargs: Extra parameters Returns: dict: Response object with version information in the body """ return self.client.get_version(**kwargs)
[docs] @wrap_response(parse_method="parse_logging_config", default_exc=RestConnectionError) def get_logging_config(self, system_name): """Get logging configuration for a System Args: system_name (str): The name of the System Returns: LoggingConfig: The configuration object """ return self.client.get_logging_config(system_name=system_name)
[docs] def find_unique_system(self, **kwargs): """Find a unique system .. note:: If 'id' is a given keyword argument then all other parameters will be ignored. Args: **kwargs: Search parameters Returns: System, None: The System if found, None otherwise Raises: FetchError: More than one matching System was found """ if "id" in kwargs: return self._find_system_by_id(kwargs.pop("id"), **kwargs) else: systems = self.find_systems(**kwargs) if not systems: return None if len(systems) > 1: raise FetchError("More than one matching System found") return systems[0]
[docs] @wrap_response(parse_method="parse_system", parse_many=True, default_exc=FetchError) def find_systems(self, **kwargs): """Find Systems using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List[System]: List of Systems matching the search parameters """ return self.client.get_systems(**kwargs)
[docs] @wrap_response(parse_method="parse_system", parse_many=False, default_exc=SaveError) def create_system(self, system): """Create a new System Args: system (System): The System to create Returns: System: The newly-created system """ return self.client.post_systems(self.parser.serialize_system(system))
[docs] @wrap_response(parse_method="parse_system", parse_many=False, default_exc=SaveError) def update_system(self, system_id, new_commands=None, **kwargs): """Update a System Args: system_id (str): The System ID new_commands (Optional[List[Command]]): New System commands Keyword Args: metadata (dict): New System metadata description (str): New System description display_name (str): New System display name icon_name (str) The: New System icon name Returns: System: The updated system """ operations = [] metadata = kwargs.pop("metadata", {}) if new_commands: commands = self.parser.serialize_command( new_commands, to_string=False, many=True ) operations.append(PatchOperation("replace", "/commands", commands)) if metadata: operations.append(PatchOperation("update", "/metadata", metadata)) for key, value in kwargs.items(): if value is not None: operations.append(PatchOperation("replace", "/%s" % key, value)) return self.client.patch_system( system_id, self.parser.serialize_patch(operations, many=True) )
[docs] def remove_system(self, **kwargs): """Remove a unique System Args: **kwargs: Search parameters Returns: bool: True if removal was successful Raises: FetchError: Couldn't find a System matching given parameters """ system = self.find_unique_system(**kwargs) if system is None: raise FetchError("No matching System found") return self._remove_system_by_id(system.id)
[docs] @wrap_response( parse_method="parse_instance", parse_many=False, default_exc=SaveError ) def initialize_instance(self, instance_id): """Start an Instance Args: instance_id (str): The Instance ID Returns: Instance: The updated Instance """ return self.client.patch_instance( instance_id, self.parser.serialize_patch(PatchOperation("initialize")) )
[docs] @wrap_response( parse_method="parse_instance", parse_many=False, default_exc=FetchError ) def get_instance(self, instance_id): """Get an Instance Args: instance_id: The Id Returns: The Instance """ return self.client.get_instance(instance_id)
[docs] def get_instance_status(self, instance_id): """Get an Instance WARNING: This method currently returns the Instance, not the Instance's status. This behavior will be corrected in 3.0. To prepare for this change please use get_instance() instead of this method. Args: instance_id: The Id Returns: The status """ warnings.warn( "This method currently returns the Instance, not the Instance's status. " "This behavior will be corrected in 3.0. To prepare please use " "get_instance() instead of this method.", FutureWarning, ) return self.get_instance(instance_id)
[docs] @wrap_response( parse_method="parse_instance", parse_many=False, default_exc=SaveError ) def update_instance_status(self, instance_id, new_status): """Update an Instance status Args: instance_id (str): The Instance ID new_status (str): The new status Returns: Instance: The updated Instance """ return self.client.patch_instance( instance_id, self.parser.serialize_patch( PatchOperation("replace", "/status", new_status) ), )
[docs] @wrap_response(return_boolean=True, default_exc=SaveError) def instance_heartbeat(self, instance_id): """Send an Instance heartbeat Args: instance_id (str): The Instance ID Returns: bool: True if the heartbeat was successful """ return self.client.patch_instance( instance_id, self.parser.serialize_patch(PatchOperation("heartbeat")) )
[docs] @wrap_response(return_boolean=True, default_exc=DeleteError) def remove_instance(self, instance_id): """Remove an Instance Args: instance_id (str): The Instance ID Returns: bool: True if the remove was successful """ if instance_id is None: raise DeleteError("Cannot delete an instance without an id") return self.client.delete_instance(instance_id)
[docs] def find_unique_request(self, **kwargs): """Find a unique request .. note:: If 'id' is a given keyword argument then all other parameters will be ignored. Args: **kwargs: Search parameters Returns: Request, None: The Request if found, None otherwise Raises: FetchError: More than one matching Request was found """ if "id" in kwargs: return self._find_request_by_id(kwargs.pop("id")) else: all_requests = self.find_requests(**kwargs) if not all_requests: return None if len(all_requests) > 1: raise FetchError("More than one matching Request found") return all_requests[0]
[docs] @wrap_response( parse_method="parse_request", parse_many=True, default_exc=FetchError ) def find_requests(self, **kwargs): """Find Requests using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List[Request]: List of Systems matching the search parameters """ return self.client.get_requests(**kwargs)
[docs] @wrap_response( parse_method="parse_request", parse_many=False, default_exc=SaveError ) def create_request(self, request, **kwargs): """Create a new Request Args: request: New request definition kwargs: Extra request parameters Keyword Args: blocking (bool): Wait for request to complete before returning timeout (int): Maximum seconds to wait for completion Returns: Request: The newly-created Request """ return self.client.post_requests( self.parser.serialize_request(request), **kwargs )
[docs] @wrap_response( parse_method="parse_request", parse_many=False, default_exc=SaveError ) def update_request(self, request_id, status=None, output=None, error_class=None): """Update a Request Args: request_id (str): The Request ID status (Optional[str]): New Request status output (Optional[str]): New Request output error_class (Optional[str]): New Request error class Returns: Response: The updated response """ operations = [] if status: operations.append(PatchOperation("replace", "/status", status)) if output: operations.append(PatchOperation("replace", "/output", output)) if error_class: operations.append(PatchOperation("replace", "/error_class", error_class)) return self.client.patch_request( request_id, self.parser.serialize_patch(operations, many=True) )
[docs] @wrap_response(return_boolean=True) def publish_event(self, *args, **kwargs): """Publish a new event Args: *args: If a positional argument is given it's assumed to be an Event and will be used **kwargs: Will be used to construct a new Event to publish if no Event is given in the positional arguments Keyword Args: _publishers (Optional[List[str]]): List of publisher names. If given the Event will only be published to the specified publishers. Otherwise all publishers known to Beergarden will be used. Returns: bool: True if the publish was successful """ publishers = kwargs.pop("_publishers", None) event = args[0] if args else Event(**kwargs) return self.client.post_event( self.parser.serialize_event(event), publishers=publishers )
[docs] @wrap_response(parse_method="parse_queue", parse_many=True) def get_queues(self): """Retrieve all queue information :return: The response """ return self.client.get_queues()
[docs] @wrap_response(return_boolean=True) def clear_queue(self, queue_name): """Cancel and remove all Requests from a message queue Args: queue_name (str): The name of the queue to clear Returns: bool: True if the clear was successful """ return self.client.delete_queue(queue_name)
[docs] @wrap_response(return_boolean=True) def clear_all_queues(self): """Cancel and remove all Requests in all queues Returns: bool: True if the clear was successful """ return self.client.delete_queues()
[docs] @wrap_response(parse_method="parse_job", parse_many=True, default_exc=FetchError) def find_jobs(self, **kwargs): """Find Jobs using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List[Job]: List of Jobs matching the search parameters """ return self.client.get_jobs(**kwargs)
[docs] @wrap_response(parse_method="parse_job", parse_many=False, default_exc=SaveError) def create_job(self, job): """Create a new Job Args: job (Job): New Job definition Returns: Job: The newly-created Job """ return self.client.post_jobs(self.parser.serialize_job(job))
[docs] @wrap_response(return_boolean=True, default_exc=DeleteError) def remove_job(self, job_id): """Remove a unique Job Args: job_id (str): The Job ID Returns: bool: True if removal was successful Raises: DeleteError: Couldn't remove Job """ return self.client.delete_job(job_id)
[docs] def pause_job(self, job_id): """Pause a Job Args: job_id (str): The Job ID Returns: Job: The updated Job """ self._patch_job(job_id, [PatchOperation("update", "/status", "PAUSED")])
[docs] def resume_job(self, job_id): """Resume a Job Args: job_id (str): The Job ID Returns: Job: The updated Job """ self._patch_job(job_id, [PatchOperation("update", "/status", "RUNNING")])
[docs] @wrap_response( parse_method="parse_principal", parse_many=False, default_exc=FetchError ) def get_user(self, user_identifier): """Find a user Args: user_identifier (str): User ID or username Returns: Principal: The User """ return self.client.get_user(user_identifier)
[docs] def who_am_i(self): """Find user using the current set of credentials Returns: Principal: The User """ return self.get_user(self.client.username or "anonymous")
@wrap_response( parse_method="parse_system", parse_many=False, default_exc=FetchError, raise_404=False, ) def _find_system_by_id(self, system_id, **kwargs): return self.client.get_system(system_id, **kwargs) @wrap_response(return_boolean=True, default_exc=DeleteError) def _remove_system_by_id(self, system_id): if system_id is None: raise DeleteError("Cannot delete a system without an id") return self.client.delete_system(system_id) @wrap_response( parse_method="parse_request", parse_many=False, default_exc=FetchError, raise_404=False, ) def _find_request_by_id(self, request_id): return self.client.get_request(request_id) @wrap_response(parse_method="parse_job", parse_many=False, default_exc=SaveError) def _patch_job(self, job_id, operations): return self.client.patch_job( job_id, self.parser.serialize_patch(operations, many=True) )
[docs]class BrewmasterEasyClient(EasyClient): def __init__(self, *args, **kwargs): warnings.warn( "Call made to 'BrewmasterEasyClient'. This name will be " "removed in version 3.0, please use 'EasyClient' instead.", DeprecationWarning, stacklevel=2, ) super(BrewmasterEasyClient, self).__init__(*args, **kwargs)