Source code for brewtils.rest.easy_client

# -*- coding: utf-8 -*-
import json
from base64 import b64decode
from io import BytesIO
from pathlib import Path
from typing import Any, Callable, List, NoReturn, Optional, Type, Union

import six
import wrapt
from brewtils.config import get_connection_info
from brewtils.errors import (
    BrewtilsException,
    ConflictError,
    DeleteError,
    FetchError,
    NotFoundError,
    RestConnectionError,
    RestError,
    SaveError,
    TooLargeError,
    ValidationError,
    WaitExceededError,
    _deprecate,
)
from brewtils.models import BaseModel, Event, Job, PatchOperation
from brewtils.rest.client import RestClient
from brewtils.schema_parser import SchemaParser
from requests import Response  # noqa # not in requirements file


[docs]def get_easy_client(**kwargs): # type: (**Any) -> EasyClient """Easy way to get an EasyClient The benefit to this method over creating an EasyClient directly is that this method will also search the environment for parameters. Kwargs passed to this method will take priority, however. Args: **kwargs: Options for configuring the EasyClient Returns: brewtils.rest.easy_client.EasyClient: The configured client """ return EasyClient(**get_connection_info(**kwargs))
[docs]def handle_response_failure(response, default_exc=RestError, raise_404=True): # type: (Response, Type[BrewtilsException], bool) -> NoReturn """Deal with a response with non-2xx status code Args: response: The response object default_exc: The exception to raise if no specific exception is warranted raise_404: If True a response with status code 404 will raise a NotFoundError. If False the method will return None. Returns: None - this function will always raise Raises: NotFoundError: Status code 404 and raise_404 is True WaitExceededError: Status code 408 ConflictError: Status code 409 TooLargeError: Status code 413 ValidationError: Any other 4xx status codes RestConnectionError: Status code 503 default_exc: Any other status code """ try: message = response.json() except ValueError: message = response.text if response.status_code == 404: if raise_404: raise NotFoundError(message) else: return None elif response.status_code == 408: raise WaitExceededError(message) elif response.status_code == 409: raise ConflictError(message) elif response.status_code == 413: raise TooLargeError(message) elif 400 <= response.status_code < 500: raise ValidationError(message) elif response.status_code == 503: raise RestConnectionError(message) else: raise default_exc(message)
[docs]def wrap_response( return_boolean=False, # type: bool parse_method=None, # type: Optional[str] parse_many=False, # type: bool default_exc=RestError, # type: Type[BrewtilsException] raise_404=True, # type: bool ): # type: (...) -> Callable[..., Union[bool, Response, BaseModel, List[BaseModel]]] """Decorator to consolidate response parsing and error handling Args: return_boolean: If True, a successful response will also return True parse_method: Response json will be passed to this method of the SchemaParser parse_many: Will be passed as the 'many' parameter when parsing the response default_exc: Will be passed to handle_response_failure for failed responses raise_404: Will be passed to handle_response_failure for failed responses Returns: - True if return_boolean is True and the response status code is 2xx. - The response object if return_boolean is False and parse_method is "" - A parsed Brewtils model if return_boolean is False and parse_method is defined Raises: RestError: The response has a non-2xx status code. Note that the specific exception raised depends on the response status code and the argument passed as the default_exc parameter. """ @wrapt.decorator def wrapper(wrapped, _instance, args, kwargs): response = wrapped(*args, **kwargs) if response.ok: if return_boolean: return True if parse_method is None: return response.json() return getattr(SchemaParser, parse_method)(response.json(), many=parse_many) else: handle_response_failure( response, default_exc=default_exc, raise_404=raise_404 ) return wrapper
[docs]class EasyClient(object): """Client for simplified communication with Beergarden This class is intended to be a middle ground between the RestClient and SystemClient. It provides a 'cleaner' interface to some common Beergarden operations than is exposed by the lower-level RestClient. On the other hand, the SystemClient is much better for generating Beergarden Requests. Args: bg_host (str): Beer-garden hostname bg_port (int): Beer-garden port bg_url_prefix (str): URL path that will be used as a prefix when communicating with Beer-garden. Useful if Beer-garden is running on a URL other than '/'. ssl_enabled (bool): Whether to use SSL for Beer-garden communication ca_cert (str): Path to certificate file containing the certificate of the authority that issued the Beer-garden server certificate ca_verify (bool): Whether to verify Beer-garden server certificate client_cert (str): Path to client certificate to use when communicating with Beer-garden api_version (int): Beer-garden API version to use client_timeout (int): Max time to wait for Beer-garden server response username (str): Username for Beer-garden authentication password (str): Password for Beer-garden authentication access_token (str): Access token for Beer-garden authentication refresh_token (str): Refresh token for Beer-garden authentication """ _default_file_params = { "chunk_size": 255 * 1024, } def __init__(self, *args, **kwargs): # This points DeprecationWarnings at the right line kwargs.setdefault("stacklevel", 4) self.client = RestClient(*args, **kwargs)
[docs] def can_connect(self, **kwargs): # type: (**Any) -> bool """Determine if the Beergarden server is responding. Args: **kwargs: Keyword arguments passed to the underlying Requests method Returns: A bool indicating if the connection attempt was successful. Will return False only if a ConnectionError is raised during the attempt. Any other exception will be re-raised. Raises: requests.exceptions.RequestException: The connection attempt resulted in an exception that indicates something other than a basic connection error. For example, an error with certificate verification. """ return self.client.can_connect(**kwargs)
[docs] @wrap_response(default_exc=FetchError) def get_version(self, **kwargs): """Get Bartender, Brew-view, and API version information Args: **kwargs: Extra parameters Returns: dict: Response object with version information in the body """ return self.client.get_version(**kwargs)
[docs] @wrap_response(default_exc=FetchError) def get_config(self): """Get configuration Returns: dict: Configuration dictionary """ return self.client.get_config()
[docs] @wrap_response(default_exc=FetchError) def get_logging_config(self, system_name=None, local=False): """Get a logging configuration Note that the system_name is not relevant and is only provided for backward-compatibility. Args: system_name (str): UNUSED Returns: dict: The configuration object """ return self.client.get_logging_config(local=local)
[docs] @wrap_response( parse_method="parse_garden", parse_many=False, default_exc=FetchError ) def get_garden(self, garden_name): """Get a Garden Args: garden_name: Name of garden to retrieve Returns: The Garden """ return self.client.get_garden(garden_name)
[docs] @wrap_response(parse_method="parse_garden", parse_many=True, default_exc=FetchError) def get_gardens(self): """Get all Gardens. Returns: A list of all the Gardens """ return self.client.get_gardens()
[docs] @wrap_response(parse_method="parse_garden", parse_many=False, default_exc=SaveError) def create_garden(self, garden): """Create a new Garden Args: garden (Garden): The Garden to create Returns: Garden: The newly-created Garden """ return self.client.post_gardens(SchemaParser.serialize_garden(garden))
[docs] @wrap_response(return_boolean=True, raise_404=True) def remove_garden(self, garden_name): """Remove a unique Garden Args: garden_name (String): Name of Garden to remove Returns: bool: True if removal was successful Raises: NotFoundError: Couldn't find a Garden matching given name """ return self.client.delete_garden(garden_name)
[docs] @wrap_response(parse_method="parse_garden", default_exc=FetchError) def update_garden(self, garden): garden_as_dict = SchemaParser.serialize_garden(garden, to_string=False) patches = json.dumps( [ { "operation": "config", "path": "", "value": garden_as_dict, } ] ) return self.client.patch_garden(garden.name, patches)
[docs] @wrap_response( parse_method="parse_system", parse_many=False, default_exc=FetchError ) def get_system(self, system_id): """Get a Garden Args: system_id: The Id Returns: The System """ return self.client.get_system(system_id)
[docs] def find_unique_system(self, **kwargs): """Find a unique system .. note:: If 'id' is a given keyword argument then all other parameters will be ignored. Args: **kwargs: Search parameters Returns: System, None: The System if found, None otherwise Raises: FetchError: More than one matching System was found """ if "id" in kwargs: try: return self.get_system(kwargs.pop("id"), **kwargs) except NotFoundError: return None else: systems = self.find_systems(**kwargs) if not systems: return None if len(systems) > 1: raise FetchError("More than one matching System found") return systems[0]
[docs] @wrap_response(parse_method="parse_system", parse_many=True, default_exc=FetchError) def find_systems(self, **kwargs): """Find Systems using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List[System]: List of Systems matching the search parameters """ return self.client.get_systems(**kwargs)
[docs] @wrap_response(parse_method="parse_system", parse_many=False, default_exc=SaveError) def create_system(self, system): """Create a new System Args: system (System): The System to create Returns: System: The newly-created system """ return self.client.post_systems(SchemaParser.serialize_system(system))
[docs] @wrap_response(parse_method="parse_system", parse_many=False, default_exc=SaveError) def update_system(self, system_id, new_commands=None, **kwargs): """Update a System Args: system_id (str): The System ID new_commands (Optional[List[Command]]): New System commands Keyword Args: add_instance (Instance): An Instance to append metadata (dict): New System metadata description (str): New System description display_name (str): New System display name icon_name (str): New System icon name template (str): New System template Returns: System: The updated system """ operations = [] if new_commands is not None: commands = SchemaParser.serialize_command( new_commands, to_string=False, many=True ) operations.append(PatchOperation("replace", "/commands", commands)) add_instance = kwargs.pop("add_instance", None) if add_instance: instance = SchemaParser.serialize_instance(add_instance, to_string=False) operations.append(PatchOperation("add", "/instance", instance)) metadata = kwargs.pop("metadata", {}) if metadata: operations.append(PatchOperation("update", "/metadata", metadata)) # The remaining kwargs are all strings # Sending an empty string (instead of None) ensures they're actually cleared for key, value in kwargs.items(): operations.append(PatchOperation("replace", "/%s" % key, value or "")) return self.client.patch_system( system_id, SchemaParser.serialize_patch(operations, many=True) )
[docs] def remove_system(self, **kwargs): """Remove a unique System Args: **kwargs: Search parameters Returns: bool: True if removal was successful Raises: FetchError: Couldn't find a System matching given parameters """ system = self.find_unique_system(**kwargs) if system is None: raise FetchError("No matching System found") return self._remove_system_by_id(system.id)
[docs] @wrap_response( parse_method="parse_instance", parse_many=False, default_exc=SaveError ) def initialize_instance(self, instance_id, runner_id=None): """Start an Instance Args: instance_id (str): The Instance ID runner_id (str): The PluginRunner ID, if any Returns: Instance: The updated Instance """ return self.client.patch_instance( instance_id, SchemaParser.serialize_patch( PatchOperation(operation="initialize", value={"runner_id": runner_id}) ), )
[docs] @wrap_response( parse_method="parse_instance", parse_many=False, default_exc=FetchError ) def get_instance(self, instance_id): """Get an Instance Args: instance_id: The Id Returns: The Instance """ return self.client.get_instance(instance_id)
[docs] @wrap_response( parse_method="parse_instance", parse_many=False, default_exc=SaveError ) def update_instance(self, instance_id, **kwargs): """Update an Instance status Args: instance_id (str): The Instance ID Keyword Args: new_status (str): The new status metadata (dict): Will be added to existing instance metadata Returns: Instance: The updated Instance """ operations = [] new_status = kwargs.pop("new_status", None) metadata = kwargs.pop("metadata", {}) if new_status: operations.append(PatchOperation("replace", "/status", new_status)) if metadata: operations.append(PatchOperation("update", "/metadata", metadata)) return self.client.patch_instance( instance_id, SchemaParser.serialize_patch(operations, many=True) )
[docs] def get_instance_status(self, instance_id): """ .. deprecated: 3.0 Will be removed in 4.0. Use ``get_instance()`` instead Get an Instance's status Args: instance_id: The Id Returns: The Instance's status """ _deprecate( "This method is deprecated and scheduled to be removed in 4.0. " "Please use get_instance() instead." ) return self.get_instance(instance_id).status
[docs] def update_instance_status(self, instance_id, new_status): """ .. deprecated: 3.0 Will be removed in 4.0. Use ``update_instance()`` instead Get an Instance's status Args: instance_id (str): The Instance ID new_status (str): The new status Returns: Instance: The updated Instance """ _deprecate( "This method is deprecated and scheduled to be removed in 4.0. " "Please use update_instance() instead." ) return self.update_instance(instance_id, new_status=new_status)
[docs] @wrap_response(return_boolean=True, default_exc=SaveError) def instance_heartbeat(self, instance_id): """Send an Instance heartbeat Args: instance_id (str): The Instance ID Returns: bool: True if the heartbeat was successful """ return self.client.patch_instance( instance_id, SchemaParser.serialize_patch(PatchOperation("heartbeat")) )
[docs] @wrap_response(return_boolean=True, default_exc=DeleteError) def remove_instance(self, instance_id): """Remove an Instance Args: instance_id (str): The Instance ID Returns: bool: True if the remove was successful """ if instance_id is None: raise DeleteError("Cannot delete an instance without an id") return self.client.delete_instance(instance_id)
[docs] @wrap_response( parse_method="parse_request", parse_many=False, default_exc=FetchError ) def get_request(self, request_id): """Get a Request Args: request_id: The Id Returns: The Request """ return self.client.get_request(request_id)
[docs] def find_unique_request(self, **kwargs): """Find a unique request .. note:: If 'id' is a given keyword argument then all other parameters will be ignored. Args: **kwargs: Search parameters Returns: Request, None: The Request if found, None otherwise Raises: FetchError: More than one matching Request was found """ if "id" in kwargs: try: return self.get_request(kwargs.pop("id")) except NotFoundError: return None else: all_requests = self.find_requests(**kwargs) if not all_requests: return None if len(all_requests) > 1: raise FetchError("More than one matching Request found") return all_requests[0]
[docs] @wrap_response( parse_method="parse_request", parse_many=True, default_exc=FetchError ) def find_requests(self, **kwargs): """Find Requests using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List[Request]: List of Systems matching the search parameters """ return self.client.get_requests(**kwargs)
[docs] @wrap_response( parse_method="parse_request", parse_many=False, default_exc=SaveError ) def create_request(self, request, **kwargs): """Create a new Request Args: request: New request definition **kwargs: Extra request parameters Keyword Args: blocking (bool): Wait for request to complete before returning timeout (int): Maximum seconds to wait for completion Returns: Request: The newly-created Request """ return self.client.post_requests( SchemaParser.serialize_request(request), **kwargs )
[docs] @wrap_response( parse_method="parse_request", parse_many=False, default_exc=SaveError ) def update_request(self, request_id, status=None, output=None, error_class=None): """Update a Request Args: request_id (str): The Request ID status (Optional[str]): New Request status output (Optional[str]): New Request output error_class (Optional[str]): New Request error class Returns: Response: The updated response """ operations = [] if status: operations.append(PatchOperation("replace", "/status", status)) if output: operations.append(PatchOperation("replace", "/output", output)) if error_class: operations.append(PatchOperation("replace", "/error_class", error_class)) return self.client.patch_request( request_id, SchemaParser.serialize_patch(operations, many=True) )
[docs] @wrap_response(return_boolean=True) def publish_event(self, *args, **kwargs): """Publish a new event Args: *args: If a positional argument is given it's assumed to be an Event and will be used **kwargs: Will be used to construct a new Event to publish if no Event is given in the positional arguments Keyword Args: _publishers (Optional[List[str]]): List of publisher names. If given the Event will only be published to the specified publishers. Otherwise all publishers known to Beergarden will be used. Returns: bool: True if the publish was successful """ publishers = kwargs.pop("_publishers", None) event = args[0] if args else Event(**kwargs) return self.client.post_event( SchemaParser.serialize_event(event), publishers=publishers )
[docs] @wrap_response(parse_method="parse_queue", parse_many=True, default_exc=FetchError) def get_queues(self): """Retrieve all queue information Returns: List[Queue]: List of all Queues """ return self.client.get_queues()
[docs] @wrap_response(return_boolean=True, default_exc=DeleteError) def clear_queue(self, queue_name): """Cancel and remove all Requests from a message queue Args: queue_name (str): The name of the queue to clear Returns: bool: True if the clear was successful """ return self.client.delete_queue(queue_name)
[docs] @wrap_response(return_boolean=True, default_exc=DeleteError) def clear_all_queues(self): """Cancel and remove all Requests in all queues Returns: bool: True if the clear was successful """ return self.client.delete_queues()
[docs] @wrap_response(parse_method="parse_job", parse_many=True, default_exc=FetchError) def find_jobs(self, **kwargs): """Find Jobs using keyword arguments as search parameters Args: **kwargs: Search parameters Returns: List[Job]: List of Jobs matching the search parameters """ return self.client.get_jobs(**kwargs)
[docs] @wrap_response(parse_method="parse_job", parse_many=True, default_exc=FetchError) def export_jobs(self, job_id_list=None): # type: (Optional[List[str]]) -> List[Job] """Export jobs from an optional job ID list. If `job_id_list` is None or empty, definitions for all jobs are returned. Args: job_id_list: A list of job IDS, optional Returns: A list of job definitions """ # we should check that the argument is a list (if it's not None) because the # call to `len` will otherwise produce an unhelpful error message if job_id_list is not None and not isinstance(job_id_list, list): raise TypeError("Argument must be a list of job IDs, an empty list or None") payload = ( SchemaParser.serialize_job_ids(job_id_list, many=True) if job_id_list is not None and len(job_id_list) > 0 else "{}" ) return self.client.post_export_jobs(payload) # noqa # wrapper changes type
[docs] @wrap_response(parse_method="parse_job_ids") def import_jobs(self, job_list): # type: (List[Job]) -> List[str] """Import job definitions from a list of Jobs. Args: job_list: A list of jobs to import Returns: A list of the job IDs created """ return self.client.post_import_jobs( # noqa # wrapper changes type SchemaParser.serialize_job_for_import(job_list, many=True) )
[docs] @wrap_response(parse_method="parse_job", parse_many=False, default_exc=SaveError) def create_job(self, job): """Create a new Job Args: job (Job): New Job definition Returns: Job: The newly-created Job """ return self.client.post_jobs(SchemaParser.serialize_job(job))
[docs] @wrap_response(return_boolean=True, default_exc=DeleteError) def remove_job(self, job_id): """Remove a unique Job Args: job_id (str): The Job ID Returns: bool: True if removal was successful Raises: DeleteError: Couldn't remove Job """ return self.client.delete_job(job_id)
[docs] def pause_job(self, job_id): """Pause a Job Args: job_id (str): The Job ID Returns: Job: The updated Job """ return self._patch_job(job_id, [PatchOperation("update", "/status", "PAUSED")])
[docs] def resume_job(self, job_id): """Resume a Job Args: job_id (str): The Job ID Returns: Job: The updated Job """ return self._patch_job(job_id, [PatchOperation("update", "/status", "RUNNING")])
[docs] def execute_job(self, job_id, reset_interval=False): """Execute a Job Args: job_id (str): The Job ID reset_interval (bool): Restarts the job's interval time to now if the job's trigger is an interval Returns: Request: The returned request """ return self.client.post_execute_job(job_id, reset_interval)
[docs] @wrap_response(parse_method="parse_resolvable") def upload_bytes(self, data): # type: (bytes) -> Any """Upload a file Args: data: The bytes to upload Returns: The bytes Resolvable """ return self.client.post_file(data)
[docs] def download_bytes(self, file_id): # type: (str) -> bytes """Download bytes Args: file_id: Id of bytes to download Returns: The bytes data """ return self.client.get_file(file_id).content
[docs] @wrap_response(parse_method="parse_resolvable") def upload_file(self, path): # type: (Union[str, Path]) -> Any """Upload a file Args: path: Path to file Returns: The file Resolvable """ # As of now this just converts the data param to bytes and uses the bytes API # Ideally this would fail-over to using the chunks API if necessary with open(path, "rb") as f: bytes_data = f.read() return self.client.post_file(bytes_data)
[docs] def download_file(self, file_id, path): # type: (str, Union[str, Path]) -> Union[str, Path] """Download a file Args: file_id: The File id. path: Location for downloaded file Returns: Path to downloaded file """ data = self.download_bytes(file_id) with open(path, "wb") as f: f.write(data) return path
[docs] @wrap_response(parse_method="parse_resolvable") def upload_chunked_file( self, file_to_upload, desired_filename=None, file_params=None ): """Upload a given file to the Beer Garden server. Args: file_to_upload: Can either be an open file descriptor or a path. desired_filename: The desired filename, if none is provided it will use the basename of the file_to_upload file_params: The metadata surrounding the file. Valid Keys: See brewtils File model Returns: A BG file ID. """ default_file_params = {} # Establish the file descriptor if isinstance(file_to_upload, six.string_types): try: fd = open(file_to_upload, "rb") except Exception: raise ValidationError("Could not open the requested file name.") require_close = True else: fd = file_to_upload require_close = False try: default_file_params["file_name"] = desired_filename or fd.name except AttributeError: default_file_params["file_name"] = "no_file_name_provided" # Determine the file size cur_cursor = fd.tell() default_file_params["file_size"] = fd.seek(0, 2) - cur_cursor fd.seek(cur_cursor) if file_params is not None: file_params["file_size"] = default_file_params["file_size"] # Set the parameters to be sent file_params = file_params or dict( default_file_params, **self._default_file_params ) try: response = self.client.post_chunked_file( fd, file_params, current_position=cur_cursor ) fd.seek(cur_cursor) finally: if require_close: fd.close() if not response.ok: handle_response_failure(response, default_exc=SaveError) # The file post is best effort; make sure to verify before we let the # user do anything with it file_id = response.json()["details"]["file_id"] valid, meta = self._check_chunked_file_validity(file_id) if not valid: # Clean up if you can self.client.delete_chunked_file(file_id) raise ValidationError( "Error occurred while uploading file %s" % default_file_params["file_name"] ) return response
[docs] def download_chunked_file(self, file_id): """Download a chunked file from the Beer Garden server. Args: file_id: The beer garden-assigned file id. Returns: A file object """ (valid, meta) = self._check_chunked_file_validity(file_id) file_obj = BytesIO() if valid: for x in range(meta["number_of_chunks"]): resp = self.client.get_chunked_file(file_id, params={"chunk": x}) if resp.ok: data = resp.json()["data"] file_obj.write(b64decode(data)) else: raise ValueError("Could not fetch chunk %d" % x) else: raise ValidationError("Requested file %s is incomplete." % file_id) file_obj.seek(0) return file_obj
[docs] def delete_chunked_file(self, file_id): """Delete a given file on the Beer Garden server. Args: file_id: The beer garden-assigned file id. Returns: The API response """ return self.client.delete_chunked_file(file_id)
[docs] def forward(self, operation, **kwargs): """Forwards an Operation Args: operation: The Operation to be forwarded **kwargs: Keyword arguments to pass to Requests session call Returns: The API response """ return self.client.post_forward( SchemaParser.serialize_operation(operation), **kwargs )
[docs] @wrap_response( parse_method="parse_principal", parse_many=False, default_exc=FetchError ) def get_user(self, user_identifier): """Find a user Args: user_identifier (str): User ID or username Returns: Principal: The User """ return self.client.get_user(user_identifier)
[docs] def who_am_i(self): """Find user using the current set of credentials Returns: Principal: The User """ return self.get_user(self.client.username or "anonymous")
[docs] @wrap_response(return_boolean=True) def rescan(self): """Rescan local plugin directory Returns: bool: True if rescan was successful """ return self.client.patch_admin( payload=SchemaParser.serialize_patch(PatchOperation(operation="rescan")) )
@wrap_response(return_boolean=True, default_exc=DeleteError) def _remove_system_by_id(self, system_id): if system_id is None: raise DeleteError("Cannot delete a system without an id") return self.client.delete_system(system_id) @wrap_response(parse_method="parse_job", parse_many=False, default_exc=SaveError) def _patch_job(self, job_id, operations): return self.client.patch_job( job_id, SchemaParser.serialize_patch(operations, many=True) ) def _check_chunked_file_validity(self, file_id): """Verify a chunked file Args: file_id: The BG-assigned file id. Returns: A tuple containing the result and supporting metadata, if available """ response = self.client.get_chunked_file(file_id, params={"verify": True}) if not response.ok: return False, None metadata_json = response.json() if "valid" in metadata_json and metadata_json["valid"]: return True, metadata_json else: return False, metadata_json