# -*- coding: utf-8 -*-
import functools
import json
from base64 import b64encode
from typing import Any, List
import brewtils.plugin
import requests.exceptions
import urllib3
from brewtils.errors import _deprecate
from brewtils.rest import normalize_url_prefix
from brewtils.specification import _CONNECTION_SPEC
from requests import Response, Session
from requests.adapters import HTTPAdapter
from requests.utils import quote
from yapconf import YapconfSpec
[docs]def enable_auth(method):
"""Decorate methods with this to enable using authentication"""
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
original_response = method(self, *args, **kwargs)
if original_response.status_code != 401:
return original_response
# Try to use credentials
if (self.username and self.password) or self.client_cert:
credential_response = self.get_tokens()
if credential_response.ok:
return method(self, *args, **kwargs)
# Authenticate and retry failed; just return the original response
return original_response
return wrapper
[docs]class TimeoutAdapter(HTTPAdapter):
"""Transport adapter with a default request timeout"""
def __init__(self, **kwargs):
self.timeout = kwargs.pop("timeout", None)
super(TimeoutAdapter, self).__init__(**kwargs)
[docs] def send(self, *args, **kwargs):
"""Sends PreparedRequest object with specified timeout."""
kwargs["timeout"] = kwargs.get("timeout") or self.timeout
return super(TimeoutAdapter, self).send(*args, **kwargs)
[docs]class RestClient(object):
"""HTTP client for communicating with Beer-garden.
The is the low-level client responsible for making the actual REST calls. Other
clients (e.g. :py:class:`brewtils.rest.easy_client.EasyClient`) build on this by
providing useful abstractions.
Args:
bg_host (str): Beer-garden hostname
bg_port (int): Beer-garden port
bg_url_prefix (str): URL path that will be used as a prefix when communicating
with Beer-garden. Useful if Beer-garden is running on a URL other than '/'.
ssl_enabled (bool): Whether to use SSL for Beer-garden communication
ca_cert (str): Path to certificate file containing the certificate of the
authority that issued the Beer-garden server certificate
ca_verify (bool): Whether to verify Beer-garden server certificate
client_cert (str): Path to client certificate to use when communicating with
Beer-garden
api_version (int): Beer-garden API version to use
client_timeout (int): Max time to wait for Beer-garden server response
username (str): Username for Beer-garden authentication
password (str): Password for Beer-garden authentication
access_token (str): Access token for Beer-garden authentication
refresh_token (deprecated): Refresh token for Beer-garden authentication
"""
# Latest API version currently released
LATEST_VERSION = 1
JSON_HEADERS = {"Content-type": "application/json", "Accept": "text/plain"}
def __init__(self, *args, **kwargs):
self._config = self._load_config(args, kwargs)
self.bg_host = self._config.bg_host
self.bg_port = self._config.bg_port
self.bg_prefix = self._config.bg_url_prefix
self.api_version = self._config.api_version
self.username = self._config.username
self.password = self._config.password
self.access_token = self._config.access_token
self.refresh_token = self._config.refresh_token
self.client_cert = self._config.client_cert
self.client_key = self._config.client_key
# Configure the session to use when making requests
self.session = Session()
if self._config.proxy:
if self._config.ssl_enabled:
self.session.proxies.update({"https": self._config.proxy})
else:
self.session.proxies.update({"http": self._config.proxy})
# This is what Requests is expecting
if self._config.client_key:
self.session.cert = (self._config.client_cert, self._config.client_key)
else:
self.session.cert = self._config.client_cert
if not self._config.ca_verify:
urllib3.disable_warnings()
self.session.verify = False
elif self._config.ca_cert:
self.session.verify = self._config.ca_cert
client_timeout = self._config.client_timeout
if client_timeout == -1:
client_timeout = None
# Having two is kind of strange to me, but this is what Requests does
self.session.mount("https://", TimeoutAdapter(timeout=client_timeout))
self.session.mount("http://", TimeoutAdapter(timeout=client_timeout))
# Configure the beer-garden URLs
self.base_url = "%s://%s:%s%s" % (
"https" if self._config.ssl_enabled else "http",
self.bg_host,
self.bg_port,
normalize_url_prefix(self.bg_prefix),
)
self.version_url = self.base_url + "version"
self.config_url = self.base_url + "config"
if self.api_version == 1:
self.garden_url = self.base_url + "api/v1/gardens/"
self.system_url = self.base_url + "api/v1/systems/"
self.instance_url = self.base_url + "api/v1/instances/"
self.command_url = self.base_url + "api/v1/commands/"
self.request_url = self.base_url + "api/v1/requests/"
self.queue_url = self.base_url + "api/v1/queues/"
self.logging_url = self.base_url + "api/v1/logging/"
self.job_url = self.base_url + "api/v1/jobs/"
self.job_export_url = self.base_url + "api/v1/export/jobs/"
self.job_import_url = self.base_url + "api/v1/import/jobs/"
self.token_url = self.base_url + "api/v1/token/"
self.user_url = self.base_url + "api/v1/users/"
self.admin_url = self.base_url + "api/v1/admin/"
self.forward_url = self.base_url + "api/v1/forward"
# Deprecated
self.logging_config_url = self.base_url + "api/v1/config/logging/"
# Beta
self.event_url = self.base_url + "api/vbeta/events/"
self.chunk_url = self.base_url + "api/vbeta/chunks/"
self.file_url = self.base_url + "api/vbeta/file/"
else:
raise ValueError("Invalid Beer-garden API version: %s" % self.api_version)
@staticmethod
def _load_config(args, kwargs):
"""Load a config based on the CONNECTION section of the Brewtils Specification
This will load a configuration with the following source precedence:
1. kwargs
2. kwargs with "old" names ("host", "port", "url_prefix")
3. host and port passed as positional arguments
4. the global configuration (brewtils.plugin.CONFIG)
Args:
args (deprecated): host and port
kwargs: Standard connection arguments to be used
Returns:
The resolved configuration object
"""
spec = YapconfSpec(_CONNECTION_SPEC)
renamed = {}
for key in ["host", "port", "url_prefix"]:
if kwargs.get(key):
renamed["bg_" + key] = kwargs.get(key)
positional = {}
if len(args) > 0:
_deprecate(
"Heads up - passing bg_host as a positional argument is deprecated "
"and will be removed in version 4.0",
stacklevel=kwargs.get("stacklevel", 3),
)
positional["bg_host"] = args[0]
if len(args) > 1:
_deprecate(
"Heads up - passing bg_port as a positional argument is deprecated "
"and will be removed in version 4.0",
stacklevel=kwargs.get("stacklevel", 3),
)
positional["bg_port"] = args[1]
return spec.load_config(*[kwargs, renamed, positional, brewtils.plugin.CONFIG])
[docs] def can_connect(self, **kwargs):
# type: (**Any) -> bool
"""Determine if a connection to the Beer-garden server is possible
Args:
**kwargs: Keyword arguments to pass to Requests session call
Returns:
A bool indicating if the connection attempt was successful. Will
return False only if a ConnectionError is raised during the attempt.
Any other exception will be re-raised.
Raises:
requests.exceptions.RequestException:
The connection attempt resulted in an exception that indicates
something other than a basic connection error. For example,
an error with certificate verification.
"""
try:
self.session.get(self.config_url, **kwargs)
except requests.exceptions.ConnectionError as ex:
if type(ex) == requests.exceptions.ConnectionError:
return False
raise
return True
[docs] @enable_auth
def get_version(self, **kwargs):
# type: (**Any) -> Response
"""Perform a GET to the version URL
Args:
**kwargs (deprecated): Unused. Accepted for compatibility.
Returns:
Requests Response object
"""
if kwargs:
_deprecate(
"Keyword arguments for get_version are no longer used and will be "
"removed in a future release."
)
return self.session.get(self.version_url)
[docs] @enable_auth
def get_config(self, **kwargs):
# type: (**Any) -> Response
"""Perform a GET to the config URL
Args:
**kwargs (deprecated): Unused. Accepted for compatibility.
Returns:
Requests Response object
"""
if kwargs:
_deprecate(
"Keyword arguments for get_config are no longer used and will be "
"removed in a future release."
)
return self.session.get(self.config_url)
[docs] @enable_auth
def get_logging_config(self, **kwargs):
# type: (**Any) -> Response
"""Perform a GET to the logging config URL
Args:
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.logging_url, params=kwargs)
[docs] @enable_auth
def get_garden(self, garden_name, **kwargs):
# type: (str, **Any) -> Response
"""Performs a GET on the Garden URL
Args:
garden_name: Name of garden to retreive
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
# quote will URL encode the Garden name
return self.session.get(self.garden_url + quote(garden_name), params=kwargs)
[docs] @enable_auth
def get_gardens(self, **kwargs):
# type: (**Any) -> Response
"""Preform a GET on the Garden URL
This fetches all gardens.
Returns:
Requests Response object
"""
return self.session.get(self.garden_url, params=kwargs)
[docs] @enable_auth
def post_gardens(self, payload):
# type: (str) -> Response
"""Performs a POST on the Garden URL
Args:
payload: New Garden definition
Returns:
Requests Response object
"""
return self.session.post(
self.garden_url, data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def delete_garden(self, garden_name):
# type: (str) -> Response
"""Performs a DELETE on a Garden URL
Args:
garden_name: Name of Garden to delete
Returns:
Requests Response object
"""
# quote will URL encode the Garden name
return self.session.delete(self.garden_url + quote(garden_name))
[docs] @enable_auth
def patch_garden(self, garden_name, payload):
# type: (str, str) -> Response
"""Perform a PATCH on a Garden URL
Args:
garden_name: Garden name
payload: Serialized patch operation
Returns:
Request Response object
"""
return self.session.patch(
self.garden_url + quote(garden_name),
data=payload,
headers=self.JSON_HEADERS,
)
[docs] @enable_auth
def get_systems(self, **kwargs):
# type: (**Any) -> Response
"""Perform a GET on the System collection URL
Args:
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.system_url, params=kwargs)
[docs] @enable_auth
def get_system(self, system_id, **kwargs):
# type: (str, **Any) -> Response
"""Performs a GET on the System URL
Args:
system_id: System ID
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.system_url + system_id, params=kwargs)
[docs] @enable_auth
def post_systems(self, payload):
# type: (str) -> Response
"""Performs a POST on the System URL
Args:
payload: New System definition
Returns:
Requests Response object
"""
return self.session.post(
self.system_url, data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def patch_system(self, system_id, payload):
# type: (str, str) -> Response
"""Performs a PATCH on a System URL
Args:
system_id: System ID
payload: Serialized PatchOperation
Returns:
Requests Response object
"""
return self.session.patch(
self.system_url + str(system_id), data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def delete_system(self, system_id):
# type: (str) -> Response
"""Performs a DELETE on a System URL
Args:
system_id: System ID
Returns:
Requests Response object
"""
return self.session.delete(self.system_url + system_id)
[docs] @enable_auth
def get_instance(self, instance_id):
# type: (str) -> Response
"""Performs a GET on the Instance URL
Args:
instance_id: Instance ID
Returns:
Requests Response object
"""
return self.session.get(self.instance_url + instance_id)
[docs] @enable_auth
def patch_instance(self, instance_id, payload):
# type: (str, str) -> Response
"""Performs a PATCH on the instance URL
Args:
instance_id: Instance ID
payload: Serialized PatchOperation
Returns:
Requests Response object
"""
return self.session.patch(
self.instance_url + str(instance_id),
data=payload,
headers=self.JSON_HEADERS,
)
[docs] @enable_auth
def delete_instance(self, instance_id):
# type: (str) -> Response
"""Performs a DELETE on an Instance URL
Args:
instance_id: Instance ID
Returns:
Requests Response object
"""
return self.session.delete(self.instance_url + instance_id)
[docs] @enable_auth
def get_commands(self):
# type: () -> Response
"""Performs a GET on the Commands URL
Returns:
Requests Response object
"""
return self.session.get(self.command_url)
[docs] @enable_auth
def get_command(self, command_id):
# type: (str) -> Response
"""Performs a GET on the Command URL
Args:
command_id: Command ID
Returns:
Requests Response object
"""
return self.session.get(self.command_url + command_id)
[docs] @enable_auth
def get_requests(self, **kwargs):
# type: (**Any) -> Response
"""Performs a GET on the Requests URL
Args:
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.request_url, params=kwargs)
[docs] @enable_auth
def get_request(self, request_id):
# type: (str) -> Response
"""Performs a GET on the Request URL
Args:
request_id: Request ID
Returns:
Requests Response object
"""
return self.session.get(self.request_url + request_id)
[docs] @enable_auth
def post_requests(self, payload, **kwargs):
# type: (str, **Any) -> Response
"""Performs a POST on the Request URL
Args:
payload: New Request definition
**kwargs: Extra request parameters
Keyword Args:
blocking: Wait for request to complete
timeout: Maximum seconds to wait
Returns:
Requests Response object
"""
return self.session.post(
self.request_url, data=payload, headers=self.JSON_HEADERS, params=kwargs
)
[docs] @enable_auth
def patch_request(self, request_id, payload):
# type: (str, str) -> Response
"""Performs a PATCH on the Request URL
Args:
request_id: Request ID
payload: Serialized PatchOperation
Returns:
Requests Response object
"""
return self.session.patch(
self.request_url + str(request_id), data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def post_event(self, payload, publishers=None):
# type: (str, List[str]) -> Response
"""Performs a POST on the event URL
Args:
payload: Serialized new event definition
publishers: Array of publishers to use
Returns:
Requests Response object
"""
return self.session.post(
self.event_url,
data=payload,
headers=self.JSON_HEADERS,
params={"publisher": publishers} if publishers else None,
)
[docs] @enable_auth
def get_queues(self):
# type: () -> Response
"""Performs a GET on the Queues URL
Returns:
Requests Response object
"""
return self.session.get(self.queue_url)
[docs] @enable_auth
def delete_queues(self):
# type: () -> Response
"""Performs a DELETE on the Queues URL
Returns:
Requests Response object
"""
return self.session.delete(self.queue_url)
[docs] @enable_auth
def delete_queue(self, queue_name):
# type: (str) -> Response
"""Performs a DELETE on a specific Queue URL
Args:
queue_name: Queue name
Returns:
Requests Response object
"""
return self.session.delete(self.queue_url + quote(queue_name))
[docs] @enable_auth
def get_jobs(self, **kwargs):
# type: (**Any) -> Response
"""Performs a GET on the Jobs URL.
Args:
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.job_url, params=kwargs)
[docs] @enable_auth
def get_job(self, job_id):
# type: (str) -> Response
"""Performs a GET on the Job URL
Args:
job_id: Job ID
Returns:
Requests Response object
"""
return self.session.get(self.job_url + job_id)
[docs] @enable_auth
def post_jobs(self, payload):
# type: (str) -> Response
"""Performs a POST on the Job URL
Args:
payload: New Job definition
Returns:
Requests Response object
"""
return self.session.post(self.job_url, data=payload, headers=self.JSON_HEADERS)
[docs] def post_execute_job(self, job_id, reset_interval=False):
# type: (str, bool) -> Response
"""Performs a POST on the Job Execute URL
Args:
job_id: The ID of the Job
reset_interval: Sets interval of job to restart now
Returns:
Request Response object
"""
url_params = {}
if reset_interval:
url_params["reset_interval"] = "True"
return self.session.post(
self.job_url + job_id + "/execute",
headers=self.JSON_HEADERS,
params=url_params,
)
[docs] @enable_auth
def post_export_jobs(self, payload):
# type: (str) -> Response
"""Perform a POST on the Job export URL.
Args:
payload: Serialized list of Jobs
Returns:
Requests Response object
"""
return self.session.post(
self.job_export_url, data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def post_import_jobs(self, payload):
# type: (str) -> Response
"""Perform a POST on the Job import URL.
Args:
payload: Serialized list of job definitions
Returns:
Requests Response object
"""
return self.session.post(
self.job_import_url, data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def patch_job(self, job_id, payload):
# type: (str, str) -> Response
"""Performs a PATCH on the Job URL
Args:
job_id: Job ID
payload: Serialized PatchOperation
Returns:
Requests Response object
"""
return self.session.patch(
self.job_url + str(job_id), data=payload, headers=self.JSON_HEADERS
)
[docs] @enable_auth
def delete_job(self, job_id):
# type: (str) -> Response
"""Performs a DELETE on a Job URL
Args:
job_id: Job ID
Returns:
Requests Response object
"""
return self.session.delete(self.job_url + job_id)
[docs] @enable_auth
def get_file(self, file_id, **kwargs):
# type: (str, **Any) -> Response
"""Performs a GET on the specific File URL
Args:
file_id: File ID
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.file_url + file_id, **kwargs)
[docs] @enable_auth
def post_file(self, data):
# type: (bytes) -> Response
"""Performs a PUT on the file URL
Args:
data: Data bytes
Returns:
A Requests Response object
"""
return self.session.post(self.file_url, data=data)
[docs] @enable_auth
def delete_file(self, file_id):
# type: (str) -> Response
"""Performs a DELETE on the specific File URL
Args:
file_id: File ID
Returns:
Requests Response object
"""
return self.session.delete(self.file_url + file_id)
[docs] @enable_auth
def get_chunked_file(self, file_id, **kwargs):
# type: (str, **Any) -> Response
"""Performs a GET on the specific File URL
Args:
file_id: File ID
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.get(self.chunk_url + "?file_id=" + file_id, **kwargs)
[docs] @enable_auth
def delete_chunked_file(self, file_id, **kwargs):
# type: (str, **Any) -> Response
"""Performs a GET on the specific File URL
Args:
file_id: File ID
**kwargs: Query parameters to be used in the GET request
Returns:
Requests Response object
"""
return self.session.delete(self.chunk_url + "?file_id=" + file_id, **kwargs)
[docs] @enable_auth
def post_chunked_file(self, fd, file_params, current_position=0):
"""Performs a POST on the file URL.
Args:
fd: A file descriptor
file_params: Metadata about the file
current_position: The current cursor position for the file object
Returns:
A Requests Response object
"""
# This is here in case we have not authenticated yet. Without this
# code, it is possible for us to perform the POST, which will call
# read on each of the files, that method fails with a 4XX, we then
# authenticate and try again, only to post an empty file.
fd.seek(current_position)
# Establish a top-level file handle first
result = self.session.get(self.chunk_url + "id/", params=file_params)
if not result.ok:
raise RuntimeError("Could not request file ID for file %s" % fd.name)
file_id = result.json()["details"]["file_id"]
offset = 0
retry = 0
# Break up the file into chunks and send them
while True:
current_cursor = fd.tell()
data = fd.read(file_params["chunk_size"])
if not data:
break
if type(data) != bytes:
data = bytes(data, "utf-8")
data = b64encode(data)
chunk_result = self.session.post(
self.chunk_url + "?file_id=" + file_id,
json={"data": data, "offset": offset},
)
# Allow the system to try to resend the chunk a couple of
# times before giving up.
if chunk_result.ok:
offset += 1
retry = 0
elif retry < 3:
fd.seek(current_cursor)
retry += 1
else:
raise RuntimeError(
"Could not send chunk %s, ran out of retries" % offset
)
return result
[docs] @enable_auth
def post_forward(self, payload, **kwargs):
# type: (str, **Any) -> Response
"""Performs a POST on the Forward URL
Args:
payload: The operation to be executed
**kwargs: Keyword arguments to pass to Requests session call
Returns:
The API response
"""
return self.session.post(
self.forward_url, data=payload, headers=self.JSON_HEADERS, params=kwargs
)
[docs] @enable_auth
def get_user(self, user_identifier):
# type: (str) -> Response
"""Performs a GET on the specific User URL
Args:
user_identifier: User ID or username
Returns:
Requests Response object
"""
return self.session.get(self.user_url + user_identifier)
[docs] @enable_auth
def patch_admin(self, payload):
# type: (str) -> Response
"""Performs a PATCH on the admin URL
Args:
payload: Serialized PatchOperation
Returns:
Requests Response object
"""
return self.session.patch(
self.admin_url, data=payload, headers=self.JSON_HEADERS
)
[docs] def get_tokens(self, username=None, password=None):
# type: (str, str) -> Response
"""Use a username and password to get access and refresh tokens
Args:
username: Beergarden username
password: Beergarden password
Returns:
Requests Response object
"""
response = self.session.post(
self.token_url,
headers=self.JSON_HEADERS,
data=json.dumps(
{
"username": username or self.username,
"password": password or self.password,
}
),
)
if response.ok:
response_data = response.json()
self.access_token = response_data["access"]
self.session.headers["Authorization"] = "Bearer " + self.access_token
return response