Source code for brewtils.rest.client

# -*- coding: utf-8 -*-

import functools
import json
from base64 import b64encode
from typing import Any, List

import brewtils.plugin
import requests.exceptions
import urllib3
from brewtils.errors import _deprecate
from brewtils.rest import normalize_url_prefix
from brewtils.specification import _CONNECTION_SPEC
from requests import Response, Session
from requests.adapters import HTTPAdapter
from requests.utils import quote
from yapconf import YapconfSpec


[docs]def enable_auth(method): """Decorate methods with this to enable using authentication""" @functools.wraps(method) def wrapper(self, *args, **kwargs): original_response = method(self, *args, **kwargs) if original_response.status_code != 401: return original_response # Try to use credentials if (self.username and self.password) or self.client_cert: credential_response = self.get_tokens() if credential_response.ok: return method(self, *args, **kwargs) # Authenticate and retry failed; just return the original response return original_response return wrapper
[docs]class TimeoutAdapter(HTTPAdapter): """Transport adapter with a default request timeout""" def __init__(self, **kwargs): self.timeout = kwargs.pop("timeout", None) super(TimeoutAdapter, self).__init__(**kwargs)
[docs] def send(self, *args, **kwargs): """Sends PreparedRequest object with specified timeout.""" kwargs["timeout"] = kwargs.get("timeout") or self.timeout return super(TimeoutAdapter, self).send(*args, **kwargs)
[docs]class RestClient(object): """HTTP client for communicating with Beer-garden. The is the low-level client responsible for making the actual REST calls. Other clients (e.g. :py:class:`brewtils.rest.easy_client.EasyClient`) build on this by providing useful abstractions. Args: bg_host (str): Beer-garden hostname bg_port (int): Beer-garden port bg_url_prefix (str): URL path that will be used as a prefix when communicating with Beer-garden. Useful if Beer-garden is running on a URL other than '/'. ssl_enabled (bool): Whether to use SSL for Beer-garden communication ca_cert (str): Path to certificate file containing the certificate of the authority that issued the Beer-garden server certificate ca_verify (bool): Whether to verify Beer-garden server certificate client_cert (str): Path to client certificate to use when communicating with Beer-garden api_version (int): Beer-garden API version to use client_timeout (int): Max time to wait for Beer-garden server response username (str): Username for Beer-garden authentication password (str): Password for Beer-garden authentication access_token (str): Access token for Beer-garden authentication refresh_token (deprecated): Refresh token for Beer-garden authentication """ # Latest API version currently released LATEST_VERSION = 1 JSON_HEADERS = {"Content-type": "application/json", "Accept": "text/plain"} def __init__(self, *args, **kwargs): self._config = self._load_config(args, kwargs) self.bg_host = self._config.bg_host self.bg_port = self._config.bg_port self.bg_prefix = self._config.bg_url_prefix self.api_version = self._config.api_version self.username = self._config.username self.password = self._config.password self.access_token = self._config.access_token self.refresh_token = self._config.refresh_token self.client_cert = self._config.client_cert self.client_key = self._config.client_key # Configure the session to use when making requests self.session = Session() if self._config.proxy: if self._config.ssl_enabled: self.session.proxies.update({"https": self._config.proxy}) else: self.session.proxies.update({"http": self._config.proxy}) # This is what Requests is expecting if self._config.client_key: self.session.cert = (self._config.client_cert, self._config.client_key) else: self.session.cert = self._config.client_cert if not self._config.ca_verify: urllib3.disable_warnings() self.session.verify = False elif self._config.ca_cert: self.session.verify = self._config.ca_cert client_timeout = self._config.client_timeout if client_timeout == -1: client_timeout = None # Having two is kind of strange to me, but this is what Requests does self.session.mount("https://", TimeoutAdapter(timeout=client_timeout)) self.session.mount("http://", TimeoutAdapter(timeout=client_timeout)) # Configure the beer-garden URLs self.base_url = "%s://%s:%s%s" % ( "https" if self._config.ssl_enabled else "http", self.bg_host, self.bg_port, normalize_url_prefix(self.bg_prefix), ) self.version_url = self.base_url + "version" self.config_url = self.base_url + "config" if self.api_version == 1: self.garden_url = self.base_url + "api/v1/gardens/" self.system_url = self.base_url + "api/v1/systems/" self.instance_url = self.base_url + "api/v1/instances/" self.command_url = self.base_url + "api/v1/commands/" self.request_url = self.base_url + "api/v1/requests/" self.queue_url = self.base_url + "api/v1/queues/" self.logging_url = self.base_url + "api/v1/logging/" self.job_url = self.base_url + "api/v1/jobs/" self.job_export_url = self.base_url + "api/v1/export/jobs/" self.job_import_url = self.base_url + "api/v1/import/jobs/" self.token_url = self.base_url + "api/v1/token/" self.user_url = self.base_url + "api/v1/users/" self.admin_url = self.base_url + "api/v1/admin/" self.forward_url = self.base_url + "api/v1/forward" # Deprecated self.logging_config_url = self.base_url + "api/v1/config/logging/" # Beta self.event_url = self.base_url + "api/vbeta/events/" self.chunk_url = self.base_url + "api/vbeta/chunks/" self.file_url = self.base_url + "api/vbeta/file/" else: raise ValueError("Invalid Beer-garden API version: %s" % self.api_version) @staticmethod def _load_config(args, kwargs): """Load a config based on the CONNECTION section of the Brewtils Specification This will load a configuration with the following source precedence: 1. kwargs 2. kwargs with "old" names ("host", "port", "url_prefix") 3. host and port passed as positional arguments 4. the global configuration (brewtils.plugin.CONFIG) Args: args (deprecated): host and port kwargs: Standard connection arguments to be used Returns: The resolved configuration object """ spec = YapconfSpec(_CONNECTION_SPEC) renamed = {} for key in ["host", "port", "url_prefix"]: if kwargs.get(key): renamed["bg_" + key] = kwargs.get(key) positional = {} if len(args) > 0: _deprecate( "Heads up - passing bg_host as a positional argument is deprecated " "and will be removed in version 4.0", stacklevel=kwargs.get("stacklevel", 3), ) positional["bg_host"] = args[0] if len(args) > 1: _deprecate( "Heads up - passing bg_port as a positional argument is deprecated " "and will be removed in version 4.0", stacklevel=kwargs.get("stacklevel", 3), ) positional["bg_port"] = args[1] return spec.load_config(*[kwargs, renamed, positional, brewtils.plugin.CONFIG])
[docs] def can_connect(self, **kwargs): # type: (**Any) -> bool """Determine if a connection to the Beer-garden server is possible Args: **kwargs: Keyword arguments to pass to Requests session call Returns: A bool indicating if the connection attempt was successful. Will return False only if a ConnectionError is raised during the attempt. Any other exception will be re-raised. Raises: requests.exceptions.RequestException: The connection attempt resulted in an exception that indicates something other than a basic connection error. For example, an error with certificate verification. """ try: self.session.get(self.config_url, **kwargs) except requests.exceptions.ConnectionError as ex: if type(ex) == requests.exceptions.ConnectionError: return False raise return True
[docs] @enable_auth def get_version(self, **kwargs): # type: (**Any) -> Response """Perform a GET to the version URL Args: **kwargs (deprecated): Unused. Accepted for compatibility. Returns: Requests Response object """ if kwargs: _deprecate( "Keyword arguments for get_version are no longer used and will be " "removed in a future release." ) return self.session.get(self.version_url)
[docs] @enable_auth def get_config(self, **kwargs): # type: (**Any) -> Response """Perform a GET to the config URL Args: **kwargs (deprecated): Unused. Accepted for compatibility. Returns: Requests Response object """ if kwargs: _deprecate( "Keyword arguments for get_config are no longer used and will be " "removed in a future release." ) return self.session.get(self.config_url)
[docs] @enable_auth def get_logging_config(self, **kwargs): # type: (**Any) -> Response """Perform a GET to the logging config URL Args: **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.logging_url, params=kwargs)
[docs] @enable_auth def get_garden(self, garden_name, **kwargs): # type: (str, **Any) -> Response """Performs a GET on the Garden URL Args: garden_name: Name of garden to retreive **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ # quote will URL encode the Garden name return self.session.get(self.garden_url + quote(garden_name), params=kwargs)
[docs] @enable_auth def get_gardens(self, **kwargs): # type: (**Any) -> Response """Preform a GET on the Garden URL This fetches all gardens. Returns: Requests Response object """ return self.session.get(self.garden_url, params=kwargs)
[docs] @enable_auth def post_gardens(self, payload): # type: (str) -> Response """Performs a POST on the Garden URL Args: payload: New Garden definition Returns: Requests Response object """ return self.session.post( self.garden_url, data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def delete_garden(self, garden_name): # type: (str) -> Response """Performs a DELETE on a Garden URL Args: garden_name: Name of Garden to delete Returns: Requests Response object """ # quote will URL encode the Garden name return self.session.delete(self.garden_url + quote(garden_name))
[docs] @enable_auth def patch_garden(self, garden_name, payload): # type: (str, str) -> Response """Perform a PATCH on a Garden URL Args: garden_name: Garden name payload: Serialized patch operation Returns: Request Response object """ return self.session.patch( self.garden_url + quote(garden_name), data=payload, headers=self.JSON_HEADERS, )
[docs] @enable_auth def get_systems(self, **kwargs): # type: (**Any) -> Response """Perform a GET on the System collection URL Args: **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.system_url, params=kwargs)
[docs] @enable_auth def get_system(self, system_id, **kwargs): # type: (str, **Any) -> Response """Performs a GET on the System URL Args: system_id: System ID **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.system_url + system_id, params=kwargs)
[docs] @enable_auth def post_systems(self, payload): # type: (str) -> Response """Performs a POST on the System URL Args: payload: New System definition Returns: Requests Response object """ return self.session.post( self.system_url, data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def patch_system(self, system_id, payload): # type: (str, str) -> Response """Performs a PATCH on a System URL Args: system_id: System ID payload: Serialized PatchOperation Returns: Requests Response object """ return self.session.patch( self.system_url + str(system_id), data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def delete_system(self, system_id): # type: (str) -> Response """Performs a DELETE on a System URL Args: system_id: System ID Returns: Requests Response object """ return self.session.delete(self.system_url + system_id)
[docs] @enable_auth def get_instance(self, instance_id): # type: (str) -> Response """Performs a GET on the Instance URL Args: instance_id: Instance ID Returns: Requests Response object """ return self.session.get(self.instance_url + instance_id)
[docs] @enable_auth def patch_instance(self, instance_id, payload): # type: (str, str) -> Response """Performs a PATCH on the instance URL Args: instance_id: Instance ID payload: Serialized PatchOperation Returns: Requests Response object """ return self.session.patch( self.instance_url + str(instance_id), data=payload, headers=self.JSON_HEADERS, )
[docs] @enable_auth def delete_instance(self, instance_id): # type: (str) -> Response """Performs a DELETE on an Instance URL Args: instance_id: Instance ID Returns: Requests Response object """ return self.session.delete(self.instance_url + instance_id)
[docs] @enable_auth def get_commands(self): # type: () -> Response """Performs a GET on the Commands URL Returns: Requests Response object """ return self.session.get(self.command_url)
[docs] @enable_auth def get_command(self, command_id): # type: (str) -> Response """Performs a GET on the Command URL Args: command_id: Command ID Returns: Requests Response object """ return self.session.get(self.command_url + command_id)
[docs] @enable_auth def get_requests(self, **kwargs): # type: (**Any) -> Response """Performs a GET on the Requests URL Args: **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.request_url, params=kwargs)
[docs] @enable_auth def get_request(self, request_id): # type: (str) -> Response """Performs a GET on the Request URL Args: request_id: Request ID Returns: Requests Response object """ return self.session.get(self.request_url + request_id)
[docs] @enable_auth def post_requests(self, payload, **kwargs): # type: (str, **Any) -> Response """Performs a POST on the Request URL Args: payload: New Request definition **kwargs: Extra request parameters Keyword Args: blocking: Wait for request to complete timeout: Maximum seconds to wait Returns: Requests Response object """ return self.session.post( self.request_url, data=payload, headers=self.JSON_HEADERS, params=kwargs )
[docs] @enable_auth def patch_request(self, request_id, payload): # type: (str, str) -> Response """Performs a PATCH on the Request URL Args: request_id: Request ID payload: Serialized PatchOperation Returns: Requests Response object """ return self.session.patch( self.request_url + str(request_id), data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def post_event(self, payload, publishers=None): # type: (str, List[str]) -> Response """Performs a POST on the event URL Args: payload: Serialized new event definition publishers: Array of publishers to use Returns: Requests Response object """ return self.session.post( self.event_url, data=payload, headers=self.JSON_HEADERS, params={"publisher": publishers} if publishers else None, )
[docs] @enable_auth def get_queues(self): # type: () -> Response """Performs a GET on the Queues URL Returns: Requests Response object """ return self.session.get(self.queue_url)
[docs] @enable_auth def delete_queues(self): # type: () -> Response """Performs a DELETE on the Queues URL Returns: Requests Response object """ return self.session.delete(self.queue_url)
[docs] @enable_auth def delete_queue(self, queue_name): # type: (str) -> Response """Performs a DELETE on a specific Queue URL Args: queue_name: Queue name Returns: Requests Response object """ return self.session.delete(self.queue_url + quote(queue_name))
[docs] @enable_auth def get_jobs(self, **kwargs): # type: (**Any) -> Response """Performs a GET on the Jobs URL. Args: **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.job_url, params=kwargs)
[docs] @enable_auth def get_job(self, job_id): # type: (str) -> Response """Performs a GET on the Job URL Args: job_id: Job ID Returns: Requests Response object """ return self.session.get(self.job_url + job_id)
[docs] @enable_auth def post_jobs(self, payload): # type: (str) -> Response """Performs a POST on the Job URL Args: payload: New Job definition Returns: Requests Response object """ return self.session.post(self.job_url, data=payload, headers=self.JSON_HEADERS)
[docs] def post_execute_job(self, job_id, reset_interval=False): # type: (str, bool) -> Response """Performs a POST on the Job Execute URL Args: job_id: The ID of the Job reset_interval: Sets interval of job to restart now Returns: Request Response object """ url_params = {} if reset_interval: url_params["reset_interval"] = "True" return self.session.post( self.job_url + job_id + "/execute", headers=self.JSON_HEADERS, params=url_params, )
[docs] @enable_auth def post_export_jobs(self, payload): # type: (str) -> Response """Perform a POST on the Job export URL. Args: payload: Serialized list of Jobs Returns: Requests Response object """ return self.session.post( self.job_export_url, data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def post_import_jobs(self, payload): # type: (str) -> Response """Perform a POST on the Job import URL. Args: payload: Serialized list of job definitions Returns: Requests Response object """ return self.session.post( self.job_import_url, data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def patch_job(self, job_id, payload): # type: (str, str) -> Response """Performs a PATCH on the Job URL Args: job_id: Job ID payload: Serialized PatchOperation Returns: Requests Response object """ return self.session.patch( self.job_url + str(job_id), data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def delete_job(self, job_id): # type: (str) -> Response """Performs a DELETE on a Job URL Args: job_id: Job ID Returns: Requests Response object """ return self.session.delete(self.job_url + job_id)
[docs] @enable_auth def get_file(self, file_id, **kwargs): # type: (str, **Any) -> Response """Performs a GET on the specific File URL Args: file_id: File ID **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.file_url + file_id, **kwargs)
[docs] @enable_auth def post_file(self, data): # type: (bytes) -> Response """Performs a PUT on the file URL Args: data: Data bytes Returns: A Requests Response object """ return self.session.post(self.file_url, data=data)
[docs] @enable_auth def delete_file(self, file_id): # type: (str) -> Response """Performs a DELETE on the specific File URL Args: file_id: File ID Returns: Requests Response object """ return self.session.delete(self.file_url + file_id)
[docs] @enable_auth def get_chunked_file(self, file_id, **kwargs): # type: (str, **Any) -> Response """Performs a GET on the specific File URL Args: file_id: File ID **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.get(self.chunk_url + "?file_id=" + file_id, **kwargs)
[docs] @enable_auth def delete_chunked_file(self, file_id, **kwargs): # type: (str, **Any) -> Response """Performs a GET on the specific File URL Args: file_id: File ID **kwargs: Query parameters to be used in the GET request Returns: Requests Response object """ return self.session.delete(self.chunk_url + "?file_id=" + file_id, **kwargs)
[docs] @enable_auth def post_chunked_file(self, fd, file_params, current_position=0): """Performs a POST on the file URL. Args: fd: A file descriptor file_params: Metadata about the file current_position: The current cursor position for the file object Returns: A Requests Response object """ # This is here in case we have not authenticated yet. Without this # code, it is possible for us to perform the POST, which will call # read on each of the files, that method fails with a 4XX, we then # authenticate and try again, only to post an empty file. fd.seek(current_position) # Establish a top-level file handle first result = self.session.get(self.chunk_url + "id/", params=file_params) if not result.ok: raise RuntimeError("Could not request file ID for file %s" % fd.name) file_id = result.json()["details"]["file_id"] offset = 0 retry = 0 # Break up the file into chunks and send them while True: current_cursor = fd.tell() data = fd.read(file_params["chunk_size"]) if not data: break if type(data) != bytes: data = bytes(data, "utf-8") data = b64encode(data) chunk_result = self.session.post( self.chunk_url + "?file_id=" + file_id, json={"data": data, "offset": offset}, ) # Allow the system to try to resend the chunk a couple of # times before giving up. if chunk_result.ok: offset += 1 retry = 0 elif retry < 3: fd.seek(current_cursor) retry += 1 else: raise RuntimeError( "Could not send chunk %s, ran out of retries" % offset ) return result
[docs] @enable_auth def post_forward(self, payload, **kwargs): # type: (str, **Any) -> Response """Performs a POST on the Forward URL Args: payload: The operation to be executed **kwargs: Keyword arguments to pass to Requests session call Returns: The API response """ return self.session.post( self.forward_url, data=payload, headers=self.JSON_HEADERS, params=kwargs )
[docs] @enable_auth def get_user(self, user_identifier): # type: (str) -> Response """Performs a GET on the specific User URL Args: user_identifier: User ID or username Returns: Requests Response object """ return self.session.get(self.user_url + user_identifier)
[docs] @enable_auth def patch_admin(self, payload): # type: (str) -> Response """Performs a PATCH on the admin URL Args: payload: Serialized PatchOperation Returns: Requests Response object """ return self.session.patch( self.admin_url, data=payload, headers=self.JSON_HEADERS )
[docs] def get_tokens(self, username=None, password=None): # type: (str, str) -> Response """Use a username and password to get access and refresh tokens Args: username: Beergarden username password: Beergarden password Returns: Requests Response object """ response = self.session.post( self.token_url, headers=self.JSON_HEADERS, data=json.dumps( { "username": username or self.username, "password": password or self.password, } ), ) if response.ok: response_data = response.json() self.access_token = response_data["access"] self.session.headers["Authorization"] = "Bearer " + self.access_token return response