import logging
import warnings
import urllib3
from requests import Session
from brewtils.rest import normalize_url_prefix
[docs]class RestClient(object):
"""Simple Rest Client for communicating to with beer-garden.
The is the low-level client responsible for making the actual REST calls. Other clients
(e.g. :py:class:`brewtils.rest.easy_client.EasyClient`) build on this by providing useful
abstractions.
:param host: beer-garden REST API hostname.
:param port: beer-garden REST API port.
:param ssl_enabled: Flag indicating whether to use HTTPS when communicating with beer-garden.
:param api_version: The beer-garden REST API version. Will default to the latest version.
:param logger: The logger to use. If None one will be created.
:param ca_cert: beer-garden REST API server CA certificate.
:param client_cert: The client certificate to use when making requests.
:param url_prefix: beer-garden REST API Url Prefix.
:param ca_verify: Flag indicating whether to verify server certificate when making a request.
"""
# The Latest Version Currently released
LATEST_VERSION = 1
JSON_HEADERS = {'Content-type': 'application/json', 'Accept': 'text/plain'}
def __init__(self, host, port, ssl_enabled=False, api_version=None, logger=None, ca_cert=None,
client_cert=None, url_prefix=None, ca_verify=True):
self.logger = logger or logging.getLogger(__name__)
# Configure the session to use when making requests
self.session = Session()
if not ca_verify:
urllib3.disable_warnings()
self.session.verify = False
elif ca_cert:
self.session.verify = ca_cert
if client_cert:
self.session.cert = client_cert
# Configure the beer-garden URLs
scheme = 'https' if ssl_enabled else 'http'
base_url = '%s://%s:%s%s' % (scheme, host, port, normalize_url_prefix(url_prefix))
self.version_url = base_url + 'version'
self.config_url = base_url + 'config'
api_version = api_version or self.LATEST_VERSION
if api_version == 1:
self.system_url = base_url + 'api/v1/systems/'
self.instance_url = base_url + 'api/v1/instances/'
self.command_url = base_url + 'api/v1/commands/'
self.request_url = base_url + 'api/v1/requests/'
self.queue_url = base_url + 'api/v1/queues/'
self.logging_config_url = base_url + 'api/v1/config/logging/'
self.event_url = base_url + 'api/vbeta/events/'
else:
raise ValueError("Invalid beer-garden API version: %s" % api_version)
[docs] def get_version(self, **kwargs):
"""Perform a GET to the version URL
:param kwargs: Parameters to be used in the GET request
:return: The request response
"""
return self.session.get(self.version_url, params=kwargs)
[docs] def get_logging_config(self, **kwargs):
"""Perform a GET to the logging config URL
:param kwargs: Parameters to be used in the GET request
:return: The request response
"""
return self.session.get(self.logging_config_url, params=kwargs)
[docs] def get_systems(self, **kwargs):
"""Perform a GET on the System collection URL
:param kwargs: Parameters to be used in the GET request
:return: The request response
"""
return self.session.get(self.system_url, params=kwargs)
[docs] def get_system(self, system_id, **kwargs):
"""Performs a GET on the System URL
:param system_id: ID of system
:param kwargs: Parameters to be used in the GET request
:return: Response to the request
"""
return self.session.get(self.system_url + system_id, params=kwargs)
[docs] def post_systems(self, payload):
"""Performs a POST on the System URL
:param payload: New request definition
:return: Response to the request
"""
return self.session.post(self.system_url, data=payload, headers=self.JSON_HEADERS)
[docs] def patch_system(self, system_id, payload):
"""Performs a PATCH on a System URL
:param system_id: ID of system
:param payload: The update specification
:return: Response
"""
return self.session.patch(self.system_url + str(system_id),
data=payload, headers=self.JSON_HEADERS)
[docs] def delete_system(self, system_id):
"""Performs a DELETE on a System URL
:param system_id: The ID of the system to remove
:return: Response to the request
"""
return self.session.delete(self.system_url + system_id)
[docs] def patch_instance(self, instance_id, payload):
"""Performs a PATCH on the instance URL
:param instance_id: ID of instance
:param payload: The update specification
:return: Response
"""
return self.session.patch(self.instance_url + str(instance_id),
data=payload, headers=self.JSON_HEADERS)
[docs] def get_commands(self):
"""Performs a GET on the Commands URL"""
return self.session.get(self.command_url)
[docs] def get_command(self, command_id):
"""Performs a GET on the Command URL
:param command_id: ID of command
:return: Response to the request
"""
return self.session.get(self.command_url + command_id)
[docs] def get_requests(self, **kwargs):
"""Performs a GET on the Requests URL
:param kwargs: Parameters to be used in the GET request
:return: Response to the request
"""
return self.session.get(self.request_url, params=kwargs)
[docs] def get_request(self, request_id):
"""Performs a GET on the Request URL
:param request_id: ID of request
:return: Response to the request
"""
return self.session.get(self.request_url + request_id)
[docs] def post_requests(self, payload):
"""Performs a POST on the Request URL
:param payload: New request definition
:return: Response to the request
"""
return self.session.post(self.request_url, data=payload, headers=self.JSON_HEADERS)
[docs] def patch_request(self, request_id, payload):
"""Performs a PATCH on the Request URL
:param request_id: ID of request
:param payload: New request definition
:return: Response to the request
"""
return self.session.patch(self.request_url + str(request_id),
data=payload, headers=self.JSON_HEADERS)
[docs] def post_event(self, payload, publishers=None):
"""Performs a POST on the event URL
:param payload: New event definition
:param publishers: Array of publishers to use
:return: Response to the request
"""
return self.session.post(self.event_url, data=payload, headers=self.JSON_HEADERS,
params={'publisher': publishers} if publishers else None)
[docs] def get_queues(self):
"""Performs a GET on the Queues URL
:return: Response to the request
"""
return self.session.get(self.queue_url)
[docs] def delete_queues(self):
"""Performs a DELETE on the Queues URL
:return: Response to the request
"""
return self.session.delete(self.queue_url)
[docs] def delete_queue(self, queue_name):
"""Performs a DELETE on a specific Queue URL
:return: Response to the request
"""
return self.session.delete(self.queue_url + queue_name)
[docs]class BrewmasterRestClient(RestClient):
def __init__(self, *args, **kwargs):
warnings.warn("Call made to 'BrewmasterRestClient'. This name will be removed in version "
"3.0, please use 'RestClient' instead.", DeprecationWarning, stacklevel=2)
super(BrewmasterRestClient, self).__init__(*args, **kwargs)