Source code for brewtils.rest.client

# -*- coding: utf-8 -*-

import functools
import json
import logging
import warnings
from datetime import datetime

import jwt
import urllib3
from requests import Session
from requests.adapters import HTTPAdapter

from brewtils.rest import normalize_url_prefix


[docs]def enable_auth(method): """Decorate methods with this to enable using authentication""" @functools.wraps(method) def wrapper(self, *args, **kwargs): # Proactively refresh access token, if possible try: if self.access_token and self.refresh_token: now = datetime.utcnow() decoded = jwt.decode(self.access_token, verify=False) issued = datetime.utcfromtimestamp(int(decoded["iat"])) expires = datetime.utcfromtimestamp(int(decoded["exp"])) # Try to refresh there's less than 10% time remaining if (expires - now) < (0.1 * (expires - issued)): self.refresh() except Exception: pass original_response = method(self, *args, **kwargs) if original_response.status_code != 401: return original_response # Try to use the refresh token if self.refresh_token: refresh_response = self.refresh() if refresh_response.ok: return method(self, *args, **kwargs) # Try to use credentials if self.username and self.password: credential_response = self.get_tokens() if credential_response.ok: return method(self, *args, **kwargs) # Nothing worked, just return the original response return original_response return wrapper
[docs]class TimeoutAdapter(HTTPAdapter): """Transport adapter with a default request timeout""" def __init__(self, **kwargs): self.timeout = kwargs.pop("timeout", None) super(TimeoutAdapter, self).__init__(**kwargs)
[docs] def send(self, *args, **kwargs): kwargs["timeout"] = kwargs.get("timeout") or self.timeout return super(TimeoutAdapter, self).send(*args, **kwargs)
[docs]class RestClient(object): """Simple Rest Client for communicating to with beer-garden. The is the low-level client responsible for making the actual REST calls. Other clients (e.g. :py:class:`brewtils.rest.easy_client.EasyClient`) build on this by providing useful abstractions. :param bg_host: beer-garden REST API hostname. :param bg_port: beer-garden REST API port. :param ssl_enabled: Flag indicating whether to use HTTPS when communicating with beer-garden. :param api_version: The beer-garden REST API version. Will default to the latest version. :param logger: The logger to use. If None one will be created. :param ca_cert: beer-garden REST API server CA certificate. :param client_cert: The client certificate to use when making requests. :param url_prefix: beer-garden REST API Url Prefix. :param ca_verify: Flag indicating whether to verify server certificate when making a request. :param username: Username for Beergarden authentication :param password: Password for Beergarden authentication :param access_token: Access token for Beergarden authentication :param refresh_token: Refresh token for Beergarden authentication :param client_timeout: Max time to will wait for server response """ # The Latest Version Currently released LATEST_VERSION = 1 JSON_HEADERS = {"Content-type": "application/json", "Accept": "text/plain"} def __init__( self, bg_host=None, bg_port=None, ssl_enabled=False, api_version=None, logger=None, ca_cert=None, client_cert=None, url_prefix=None, ca_verify=True, **kwargs ): bg_host = bg_host or kwargs.get("host") if not bg_host: raise ValueError('Missing keyword argument "bg_host"') bg_port = bg_port or kwargs.get("port") if not bg_port: raise ValueError('Missing keyword argument "bg_port"') self.logger = logger or logging.getLogger(__name__) # Configure the session to use when making requests self.session = Session() timeout = kwargs.get("client_timeout", None) if timeout == -1: timeout = None # Having two is kind of strange to me, but this is what Requests does self.session.mount("https://", TimeoutAdapter(timeout=timeout)) self.session.mount("http://", TimeoutAdapter(timeout=timeout)) if not ca_verify: urllib3.disable_warnings() self.session.verify = False elif ca_cert: self.session.verify = ca_cert if client_cert: self.session.cert = client_cert self.username = kwargs.get("username", None) self.password = kwargs.get("password", None) self.access_token = kwargs.get("access_token", None) self.refresh_token = kwargs.get("refresh_token", None) # Configure the beer-garden URLs scheme = "https" if ssl_enabled else "http" self.base_url = "%s://%s:%s%s" % ( scheme, bg_host, bg_port, normalize_url_prefix(url_prefix), ) self.version_url = self.base_url + "version" self.config_url = self.base_url + "config" api_version = api_version or self.LATEST_VERSION if api_version == 1: self.system_url = self.base_url + "api/v1/systems/" self.instance_url = self.base_url + "api/v1/instances/" self.command_url = self.base_url + "api/v1/commands/" self.request_url = self.base_url + "api/v1/requests/" self.queue_url = self.base_url + "api/v1/queues/" self.logging_config_url = self.base_url + "api/v1/config/logging/" self.job_url = self.base_url + "api/v1/jobs/" self.token_url = self.base_url + "api/v1/tokens/" self.user_url = self.base_url + "api/v1/users/" self.event_url = self.base_url + "api/vbeta/events/" else: raise ValueError("Invalid beer-garden API version: %s" % api_version)
[docs] @enable_auth def get_version(self, **kwargs): """Perform a GET to the version URL :param kwargs: Parameters to be used in the GET request :return: The request response """ return self.session.get(self.version_url, params=kwargs)
[docs] @enable_auth def get_config(self, **kwargs): """Perform a GET to the config URL :param kwargs: Passed to underlying Requests method :return: The request response """ return self.session.get(self.config_url, **kwargs)
[docs] @enable_auth def get_logging_config(self, **kwargs): """Perform a GET to the logging config URL :param kwargs: Parameters to be used in the GET request :return: The request response """ return self.session.get(self.logging_config_url, params=kwargs)
[docs] @enable_auth def get_systems(self, **kwargs): """Perform a GET on the System collection URL :param kwargs: Parameters to be used in the GET request :return: The request response """ return self.session.get(self.system_url, params=kwargs)
[docs] @enable_auth def get_system(self, system_id, **kwargs): """Performs a GET on the System URL :param system_id: ID of system :param kwargs: Parameters to be used in the GET request :return: Response to the request """ return self.session.get(self.system_url + system_id, params=kwargs)
[docs] @enable_auth def post_systems(self, payload): """Performs a POST on the System URL :param payload: New request definition :return: Response to the request """ return self.session.post( self.system_url, data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def patch_system(self, system_id, payload): """Performs a PATCH on a System URL :param system_id: ID of system :param payload: The update specification :return: Response """ return self.session.patch( self.system_url + str(system_id), data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def delete_system(self, system_id): """Performs a DELETE on a System URL :param system_id: The ID of the system to remove :return: Response to the request """ return self.session.delete(self.system_url + system_id)
[docs] @enable_auth def get_instance(self, instance_id): """Performs a GET on the Instance URL :param instance_id: ID of instance :return: Response to the request """ return self.session.get(self.instance_url + instance_id)
[docs] @enable_auth def patch_instance(self, instance_id, payload): """Performs a PATCH on the instance URL :param instance_id: ID of instance :param payload: The update specification :return: Response """ return self.session.patch( self.instance_url + str(instance_id), data=payload, headers=self.JSON_HEADERS, )
[docs] @enable_auth def delete_instance(self, instance_id): """Performs a DELETE on an Instance URL :param instance_id: The ID of the instance to remove :return: Response to the request """ return self.session.delete(self.instance_url + instance_id)
[docs] @enable_auth def get_commands(self): """Performs a GET on the Commands URL""" return self.session.get(self.command_url)
[docs] @enable_auth def get_command(self, command_id): """Performs a GET on the Command URL :param command_id: ID of command :return: Response to the request """ return self.session.get(self.command_url + command_id)
[docs] @enable_auth def get_requests(self, **kwargs): """Performs a GET on the Requests URL :param kwargs: Parameters to be used in the GET request :return: Response to the request """ return self.session.get(self.request_url, params=kwargs)
[docs] @enable_auth def get_request(self, request_id): """Performs a GET on the Request URL :param request_id: ID of request :return: Response to the request """ return self.session.get(self.request_url + request_id)
[docs] @enable_auth def post_requests(self, payload, **kwargs): """Performs a POST on the Request URL Args: payload: New request definition kwargs: Extra request parameters Keyword Args: blocking: Wait for request to complete timeout: Maximum seconds to wait Returns: Response to the request """ return self.session.post( self.request_url, data=payload, headers=self.JSON_HEADERS, params=kwargs )
[docs] @enable_auth def patch_request(self, request_id, payload): """Performs a PATCH on the Request URL :param request_id: ID of request :param payload: New request definition :return: Response to the request """ return self.session.patch( self.request_url + str(request_id), data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def post_event(self, payload, publishers=None): """Performs a POST on the event URL :param payload: New event definition :param publishers: Array of publishers to use :return: Response to the request """ return self.session.post( self.event_url, data=payload, headers=self.JSON_HEADERS, params={"publisher": publishers} if publishers else None, )
[docs] @enable_auth def get_queues(self): """Performs a GET on the Queues URL :return: Response to the request """ return self.session.get(self.queue_url)
[docs] @enable_auth def delete_queues(self): """Performs a DELETE on the Queues URL :return: Response to the request """ return self.session.delete(self.queue_url)
[docs] @enable_auth def delete_queue(self, queue_name): """Performs a DELETE on a specific Queue URL :return: Response to the request """ return self.session.delete(self.queue_url + queue_name)
[docs] @enable_auth def get_jobs(self, **kwargs): """Performs a GET on the Jobs URL. Returns: Response to the request """ return self.session.get(self.job_url, params=kwargs)
[docs] @enable_auth def get_job(self, job_id): """Performs a GET on the Job URL :param job_id: ID of job :return: Response to the request """ return self.session.get(self.job_url + job_id)
[docs] @enable_auth def post_jobs(self, payload): """Performs a POST on the Job URL :param payload: New job definition :return: Response to the request """ return self.session.post(self.job_url, data=payload, headers=self.JSON_HEADERS)
[docs] @enable_auth def patch_job(self, job_id, payload): """Performs a PATCH on the Job URL :param job_id: ID of request :param payload: New job definition :return: Response to the request """ return self.session.patch( self.job_url + str(job_id), data=payload, headers=self.JSON_HEADERS )
[docs] @enable_auth def delete_job(self, job_id): """Performs a DELETE on a Job URL :param job_id: The ID of the job to remove :return: Response to the request """ return self.session.delete(self.job_url + job_id)
[docs] @enable_auth def get_user(self, user_identifier): """Performs a GET on the specific User URL :return: Response to the request :param user_identifier: ID or username of User """ return self.session.get(self.user_url + user_identifier)
[docs] def get_tokens(self, username=None, password=None): """Use a username and password to get access and refresh tokens Args: username: Beergarden username password: Beergarden password Returns: Response object """ response = self.session.post( self.token_url, headers=self.JSON_HEADERS, data=json.dumps( { "username": username or self.username, "password": password or self.password, } ), ) if response.ok: response_data = response.json() self.access_token = response_data["token"] self.refresh_token = response_data["refresh"] self.session.headers["Authorization"] = "Bearer " + self.access_token return response
[docs] def refresh(self, refresh_token=None): """Use a refresh token to obtain a new access token Args: refresh_token: Refresh token to use Returns: Response object """ refresh_token = refresh_token or self.refresh_token response = self.session.get( self.token_url, headers={"X-BG-RefreshID": refresh_token} ) # On older versions of the API (2.4.2 and below) the new refresh token # is not available. if response.status_code == 404: response = self.session.get(self.token_url + refresh_token) if response.ok: response_data = response.json() self.access_token = response_data["token"] self.session.headers["Authorization"] = "Bearer " + self.access_token return response
[docs]class BrewmasterRestClient(RestClient): def __init__(self, *args, **kwargs): warnings.warn( "Call made to 'BrewmasterRestClient'. This name will be removed in version " "3.0, please use 'RestClient' instead.", DeprecationWarning, stacklevel=2, ) super(BrewmasterRestClient, self).__init__(*args, **kwargs)