# -*- coding: utf-8 -*-
import functools
import json
import logging
import warnings
from datetime import datetime
import jwt
import urllib3
from requests import Session
from requests.adapters import HTTPAdapter
from brewtils.rest import normalize_url_prefix
[docs]def enable_auth(method):
"""Decorate methods with this to enable using authentication"""
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
# Proactively refresh access token, if possible
try:
if self.access_token and self.refresh_token:
now = datetime.utcnow()
decoded = jwt.decode(self.access_token, verify=False)
issued = datetime.utcfromtimestamp(int(decoded["iat"]))
expires = datetime.utcfromtimestamp(int(decoded["exp"]))
# Try to refresh there's less than 10% time remaining
if (expires - now) < (0.1 * (expires - issued)):
self.refresh()
except Exception:
pass
original_response = method(self, *args, **kwargs)
if original_response.status_code != 401:
return original_response
# Try to use the refresh token
if self.refresh_token:
refresh_response = self.refresh()
if refresh_response.ok:
return method(self, *args, **kwargs)
# Try to use credentials
if self.username and self.password:
credential_response = self.get_tokens()
if credential_response.ok:
return method(self, *args, **kwargs)
# Nothing worked, just return the original response
return original_response
return wrapper
[docs]class TimeoutAdapter(HTTPAdapter):
"""Transport adapter with a default request timeout"""
def __init__(self, **kwargs):
self.timeout = kwargs.pop("timeout", None)
super(TimeoutAdapter, self).__init__(**kwargs)
[docs] def send(self, *args, **kwargs):
kwargs["timeout"] = kwargs.get("timeout") or self.timeout
return super(TimeoutAdapter, self).send(*args, **kwargs)
[docs]class RestClient(object):
"""Simple Rest Client for communicating to with beer-garden.
The is the low-level client responsible for making the actual REST calls. Other clients
(e.g. :py:class:`brewtils.rest.easy_client.EasyClient`) build on this by providing useful
abstractions.
:param bg_host: beer-garden REST API hostname.
:param bg_port: beer-garden REST API port.
:param ssl_enabled: Flag indicating whether to use HTTPS when communicating with beer-garden.
:param api_version: The beer-garden REST API version. Will default to the latest version.
:param logger: The logger to use. If None one will be created.
:param ca_cert: beer-garden REST API server CA certificate.
:param client_cert: The client certificate to use when making requests.
:param url_prefix: beer-garden REST API Url Prefix.
:param ca_verify: Flag indicating whether to verify server certificate when making a request.
:param username: Username for Beergarden authentication
:param password: Password for Beergarden authentication
:param access_token: Access token for Beergarden authentication
:param refresh_token: Refresh token for Beergarden authentication
:param client_timeout: Max time to will wait for server response
"""
# The Latest Version Currently released
LATEST_VERSION = 1
JSON_HEADERS = {"Content-type": "application/json", "Accept": "text/plain"}
def __init__(
self,
bg_host=None,
bg_port=None,
ssl_enabled=False,
api_version=None,
logger=None,
ca_cert=None,
client_cert=None,
url_prefix=None,
ca_verify=True,
**kwargs
):
bg_host = bg_host or kwargs.get("host")
if not bg_host:
raise ValueError('Missing keyword argument "bg_host"')
bg_port = bg_port or kwargs.get("port")
if not bg_port:
raise ValueError('Missing keyword argument "bg_port"')
self.logger = logger or logging.getLogger(__name__)
# Configure the session to use when making requests
self.session = Session()
timeout = kwargs.get("client_timeout", None)
if timeout == -1:
timeout = None
# Having two is kind of strange to me, but this is what Requests does
self.session.mount("https://", TimeoutAdapter(timeout=timeout))
self.session.mount("http://", TimeoutAdapter(timeout=timeout))
if not ca_verify:
urllib3.disable_warnings()
self.session.verify = False
elif ca_cert:
self.session.verify = ca_cert
if client_cert:
self.session.cert = client_cert
self.username = kwargs.get("username", None)
self.password = kwargs.get("password", None)
self.access_token = kwargs.get("access_token", None)
self.refresh_token = kwargs.get("refresh_token", None)
# Configure the beer-garden URLs
scheme = "https" if ssl_enabled else "http"
self.base_url = "%s://%s:%s%s" % (
scheme,
bg_host,
bg_port,
normalize_url_prefix(url_prefix),
)
self.version_url = self.base_url + "version"
self.config_url = self.base_url + "config"
api_version = api_version or self.LATEST_VERSION
if api_version == 1:
self.system_url = self.base_url + "api/v1/systems/"
self.instance_url = self.base_url + "api/v1/instances/"
self.command_url = self.base_url + "api/v1/commands/"
self.request_url = self.base_url + "api/v1/requests/"
self.queue_url = self.base_url + "api/v1/queues/"
self.logging_config_url = self.base_url + "api/v1/config/logging/"
self.job_url = self.base_url + "api/v1/jobs/"
self.token_url = self.base_url + "api/v1/tokens/"
self.user_url = self.base_url + "api/v1/users/"
self.event_url = self.base_url + "api/vbeta/events/"
else:
raise ValueError("Invalid beer-garden API version: %s" % api_version)
[docs] @enable_auth
def get_version(self, **kwargs):
"""Perform a GET to the version URL
:param kwargs: Parameters to be used in the GET request
:return: The request response
"""
return self.session.get(self.version_url, params=kwargs)
[docs] @enable_auth
def get_config(self, **kwargs):
"""Perform a GET to the config URL
:param kwargs: Passed to underlying Requests method
:return: The request response
"""
return self.session.get(self.config_url, **kwargs)
[docs] @enable_auth
def get_logging_config(self, **kwargs):
"""Perform a GET to the logging config URL
:param kwargs: Parameters to be used in the GET request
:return: The request response
"""
return self.session.get(self.logging_config_url, params=kwargs)
[docs] @enable_auth
def get_systems(self, **kwargs):
"""Perform a GET on the System collection URL
:param kwargs: Parameters to be used in the GET request
:return: The request response
"""
return self.session.get(self.system_url, params=kwargs)
[docs] @enable_auth
def get_system(self, system_id, **kwargs):
"""Performs a GET on the System URL
:param system_id: ID of system
:param kwargs: Parameters to be used in the GET request
:return: Response to the request
"""
return self.session.get(self.system_url + system_id, params=kwargs)
[docs] @enable_auth
def post_systems(self, payload):
"""Performs a POST on the System URL
:param payload: New request definition
:return: Response to the request
"""
return self.session.post(
self.system_url, data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def patch_system(self, system_id, payload):
"""Performs a PATCH on a System URL
:param system_id: ID of system
:param payload: The update specification
:return: Response
"""
return self.session.patch(
self.system_url + str(system_id), data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def delete_system(self, system_id):
"""Performs a DELETE on a System URL
:param system_id: The ID of the system to remove
:return: Response to the request
"""
return self.session.delete(self.system_url + system_id)
[docs] @enable_auth
def get_instance(self, instance_id):
"""Performs a GET on the Instance URL
:param instance_id: ID of instance
:return: Response to the request
"""
return self.session.get(self.instance_url + instance_id)
[docs] @enable_auth
def patch_instance(self, instance_id, payload):
"""Performs a PATCH on the instance URL
:param instance_id: ID of instance
:param payload: The update specification
:return: Response
"""
return self.session.patch(
self.instance_url + str(instance_id),
data=payload,
headers=self.JSON_HEADERS,
)
[docs] @enable_auth
def delete_instance(self, instance_id):
"""Performs a DELETE on an Instance URL
:param instance_id: The ID of the instance to remove
:return: Response to the request
"""
return self.session.delete(self.instance_url + instance_id)
[docs] @enable_auth
def get_commands(self):
"""Performs a GET on the Commands URL"""
return self.session.get(self.command_url)
[docs] @enable_auth
def get_command(self, command_id):
"""Performs a GET on the Command URL
:param command_id: ID of command
:return: Response to the request
"""
return self.session.get(self.command_url + command_id)
[docs] @enable_auth
def get_requests(self, **kwargs):
"""Performs a GET on the Requests URL
:param kwargs: Parameters to be used in the GET request
:return: Response to the request
"""
return self.session.get(self.request_url, params=kwargs)
[docs] @enable_auth
def get_request(self, request_id):
"""Performs a GET on the Request URL
:param request_id: ID of request
:return: Response to the request
"""
return self.session.get(self.request_url + request_id)
[docs] @enable_auth
def post_requests(self, payload, **kwargs):
"""Performs a POST on the Request URL
Args:
payload: New request definition
kwargs: Extra request parameters
Keyword Args:
blocking: Wait for request to complete
timeout: Maximum seconds to wait
Returns:
Response to the request
"""
return self.session.post(
self.request_url, data=payload, headers=self.JSON_HEADERS, params=kwargs
)
[docs] @enable_auth
def patch_request(self, request_id, payload):
"""Performs a PATCH on the Request URL
:param request_id: ID of request
:param payload: New request definition
:return: Response to the request
"""
return self.session.patch(
self.request_url + str(request_id), data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def post_event(self, payload, publishers=None):
"""Performs a POST on the event URL
:param payload: New event definition
:param publishers: Array of publishers to use
:return: Response to the request
"""
return self.session.post(
self.event_url,
data=payload,
headers=self.JSON_HEADERS,
params={"publisher": publishers} if publishers else None,
)
[docs] @enable_auth
def get_queues(self):
"""Performs a GET on the Queues URL
:return: Response to the request
"""
return self.session.get(self.queue_url)
[docs] @enable_auth
def delete_queues(self):
"""Performs a DELETE on the Queues URL
:return: Response to the request
"""
return self.session.delete(self.queue_url)
[docs] @enable_auth
def delete_queue(self, queue_name):
"""Performs a DELETE on a specific Queue URL
:return: Response to the request
"""
return self.session.delete(self.queue_url + queue_name)
[docs] @enable_auth
def get_jobs(self, **kwargs):
"""Performs a GET on the Jobs URL.
Returns: Response to the request
"""
return self.session.get(self.job_url, params=kwargs)
[docs] @enable_auth
def get_job(self, job_id):
"""Performs a GET on the Job URL
:param job_id: ID of job
:return: Response to the request
"""
return self.session.get(self.job_url + job_id)
[docs] @enable_auth
def post_jobs(self, payload):
"""Performs a POST on the Job URL
:param payload: New job definition
:return: Response to the request
"""
return self.session.post(self.job_url, data=payload, headers=self.JSON_HEADERS)
[docs] @enable_auth
def patch_job(self, job_id, payload):
"""Performs a PATCH on the Job URL
:param job_id: ID of request
:param payload: New job definition
:return: Response to the request
"""
return self.session.patch(
self.job_url + str(job_id), data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def delete_job(self, job_id):
"""Performs a DELETE on a Job URL
:param job_id: The ID of the job to remove
:return: Response to the request
"""
return self.session.delete(self.job_url + job_id)
[docs] @enable_auth
def get_user(self, user_identifier):
"""Performs a GET on the specific User URL
:return: Response to the request
:param user_identifier: ID or username of User
"""
return self.session.get(self.user_url + user_identifier)
[docs] def get_tokens(self, username=None, password=None):
"""Use a username and password to get access and refresh tokens
Args:
username: Beergarden username
password: Beergarden password
Returns:
Response object
"""
response = self.session.post(
self.token_url,
headers=self.JSON_HEADERS,
data=json.dumps(
{
"username": username or self.username,
"password": password or self.password,
}
),
)
if response.ok:
response_data = response.json()
self.access_token = response_data["token"]
self.refresh_token = response_data["refresh"]
self.session.headers["Authorization"] = "Bearer " + self.access_token
return response
[docs] def refresh(self, refresh_token=None):
"""Use a refresh token to obtain a new access token
Args:
refresh_token: Refresh token to use
Returns:
Response object
"""
refresh_token = refresh_token or self.refresh_token
response = self.session.get(
self.token_url, headers={"X-BG-RefreshID": refresh_token}
)
# On older versions of the API (2.4.2 and below) the new refresh token
# is not available.
if response.status_code == 404:
response = self.session.get(self.token_url + refresh_token)
if response.ok:
response_data = response.json()
self.access_token = response_data["token"]
self.session.headers["Authorization"] = "Bearer " + self.access_token
return response
[docs]class BrewmasterRestClient(RestClient):
def __init__(self, *args, **kwargs):
warnings.warn(
"Call made to 'BrewmasterRestClient'. This name will be removed in version "
"3.0, please use 'RestClient' instead.",
DeprecationWarning,
stacklevel=2,
)
super(BrewmasterRestClient, self).__init__(*args, **kwargs)