# -*- coding: utf-8 -*-
import abc
import json
import logging
import sys
import threading
from concurrent.futures.thread import ThreadPoolExecutor
import six
from requests import ConnectionError as RequestsConnectionError
import brewtils.plugin
from brewtils.errors import (
BGGivesUpError,
DiscardMessageException,
RepublishRequestException,
RequestProcessingError,
RestClientError,
RestConnectionError,
TooLargeError,
parse_exception_as_json,
)
from brewtils.models import Request
from brewtils.schema_parser import SchemaParser
[docs]class RequestProcessor(object):
"""Class responsible for coordinating Request processing
The RequestProcessor is responsible for the following:
- Defining on_message_received callback that will be invoked by the PikaConsumer
- Parsing the request
- Invoking the command on the target
- Formatting the output
- Reporting request updates to Beergarden (using a RequestUpdater)
Args:
target: Incoming requests will be invoked on this object
updater: RequestUpdater that will be used for updating requests
validation_funcs: List of functions that will called before invoking a command
logger: A logger
plugin_name: The Plugin's unique name
max_workers: Max number of threads to use in the executor pool
"""
def __init__(
self,
target,
updater,
consumer,
validation_funcs=None,
logger=None,
plugin_name=None,
max_workers=None,
resolver=None,
system=None,
):
self.logger = logger or logging.getLogger(__name__)
self.consumer = consumer
self.consumer.on_message_callback = self.on_message_received
self._target = target
self._updater = updater
self._plugin_name = plugin_name
self._validation_funcs = validation_funcs or []
self._pool = ThreadPoolExecutor(max_workers=max_workers)
self._resolver = resolver
self._system = system
[docs] def on_message_received(self, message, headers):
"""Callback function that will be invoked for received messages
This will attempt to parse the message and then run the parsed Request through
all validation functions that this RequestProcessor knows about.
If the request parses cleanly and passes validation it will be submitted to this
RequestProcessor's ThreadPoolExecutor for processing.
Args:
message: The message string
headers: The header dictionary
Returns:
A future that will complete when processing finishes
Raises:
DiscardMessageException: The request failed to parse correctly
RequestProcessException: Validation failures should raise a subclass of this
"""
request = self._parse(message)
for func in self._validation_funcs:
func(request)
# This message has already been processed, all it needs to do is update
if request.status in Request.COMPLETED_STATUSES:
return self._pool.submit(self._updater.update_request, request, headers)
else:
return self._pool.submit(
self.process_message, self._target, request, headers
)
[docs] def process_message(self, target, request, headers):
"""Process a message. Intended to be run on an Executor.
Will set the status to IN_PROGRESS, invoke the command, and set the final
status / output / error_class.
Args:
target: The object to invoke received commands on
request: The parsed Request
headers: Dictionary of headers from the `PikaConsumer`
Returns:
None
"""
request.status = "IN_PROGRESS"
self._updater.update_request(request, headers)
try:
# Set request context so this request will be the parent of any
# generated requests and update status We also need the host/port of
# the current plugin. We currently don't support parent/child
# requests across different servers.
brewtils.plugin.request_context.current_request = request
output = self._invoke_command(target, request, headers)
except Exception as exc:
self._handle_invoke_failure(request, exc)
else:
self._handle_invoke_success(request, output)
self._updater.update_request(request, headers)
[docs] def startup(self):
"""Start the RequestProcessor"""
self.consumer.start()
[docs] def shutdown(self):
"""Stop the RequestProcessor"""
self.logger.debug("Shutting down consumer")
self.consumer.stop_consuming()
# Finish all current actions
self._pool.shutdown(wait=True)
self.consumer.stop()
self.consumer.join()
# Give the updater a chance to shutdown
self._updater.shutdown()
def _handle_invoke_success(self, request, output):
request.status = "SUCCESS"
request.output = self._format_output(output)
def _handle_invoke_failure(self, request, exc):
self.logger.log(
getattr(exc, "_bg_error_log_level", logging.ERROR),
"Plugin %s raised an exception while processing request %s: %s",
self._plugin_name,
str(request),
exc,
exc_info=not getattr(exc, "_bg_suppress_stacktrace", False),
)
request.status = "ERROR"
request.output = self._format_error_output(request, exc)
request.error_class = type(exc).__name__
def _parse(self, message):
"""Parse a message using the standard SchemaParser
Args:
message: The raw (json) message body
Returns:
A Request model
Raises:
DiscardMessageException: The request failed to parse correctly
"""
try:
return SchemaParser.parse_request(message, from_string=True)
except Exception as ex:
self.logger.exception(
"Unable to parse message body: {0}. Exception: {1}".format(message, ex)
)
raise DiscardMessageException("Error parsing message body")
def _invoke_command(self, target, request, headers):
"""Invoke the function named in request.command
Args:
target: The object to search for the function implementation.
request: The request to process
headers: The headers for this request
Returns:
The output of the function call
Raises:
RequestProcessingError: The specified target does not define a
callable implementation of request.command
"""
if not callable(getattr(target, request.command, None)):
raise RequestProcessingError(
"Could not find an implementation of command '%s'" % request.command
)
# Get the command to use the parameter definitions when resolving
command = None
if self._system:
command = self._system.get_command_by_name(request.command)
# Now resolve parameters, if necessary
if request.is_ephemeral or not command:
parameters = request.parameters or {}
else:
parameters = self._resolver.resolve(
request.parameters,
definitions=command.parameters,
upload=False,
)
return getattr(target, request.command)(**parameters)
@staticmethod
def _format_error_output(request, exc):
if request.is_json:
return parse_exception_as_json(exc)
else:
return str(exc)
@staticmethod
def _format_output(output):
if isinstance(output, six.string_types):
return output
try:
return json.dumps(output)
except (TypeError, ValueError):
return str(output)
[docs]class AdminProcessor(RequestProcessor):
"""RequestProcessor with slightly modified process method"""
[docs] def process_message(self, target, request, headers):
"""Process a message. Intended to be run on an Executor.
Will invoke the command and set the final status / output / error_class.
Will NOT set the status to IN_PROGRESS or set the request context.
Args:
target: The object to invoke received commands on
request: The parsed Request
headers: Dictionary of headers from the `PikaConsumer`
Returns:
None
"""
try:
output = self._invoke_command(target, request, headers)
except Exception as exc:
self._handle_invoke_failure(request, exc)
else:
self._handle_invoke_success(request, output)
self._updater.update_request(request, headers)
[docs]@six.add_metaclass(abc.ABCMeta)
class RequestConsumer(threading.Thread):
"""Base class for consumers
Classes deriving from this are expected to provide a concrete implementation for a
specific queue type.
After the consumer is created it will be passed to a ``RequestProcessor``. The
processor will then set the ``on_message_callback`` property of the consumer to the
correct method.
This means when the consumer receives a message it should invoke its own
``_on_message_callback`` method with the message body and headers as parameters::
self._on_message_callback(body, properties.headers)
"""
def __init__(self, *args, **kwargs):
super(RequestConsumer, self).__init__(*args, **kwargs)
self._on_message_callback = None
[docs] def stop_consuming(self):
pass
@property
def on_message_callback(self):
return self._on_message_callback
@on_message_callback.setter
def on_message_callback(self, new_callback):
self._on_message_callback = new_callback
[docs] @staticmethod
def create(connection_type=None, **kwargs):
"""Factory method for consumer creation
Currently the only supported connection_type is "rabbitmq", which will return
an instance of ``brewtils.pika.PikaConsumer``.
Args:
connection_type (str): String describing connection type
kwargs: Keyword arguments to be passed to the Consumer initializer
Returns:
Concrete instance of RequestConsumer
Raises:
ValueError: The specified connection_type does not map to a consumer class
"""
if connection_type == "rabbitmq":
from brewtils.pika import PikaConsumer
return PikaConsumer(**kwargs)
raise ValueError("Unknown connection type '%s'" % connection_type)
[docs]@six.add_metaclass(abc.ABCMeta)
class RequestUpdater(object):
[docs] @abc.abstractmethod
def update_request(self, request, headers):
pass
[docs] @abc.abstractmethod
def shutdown(self):
pass
[docs]class NoopUpdater(RequestUpdater):
"""RequestUpdater implementation that explicitly does not update."""
def __init__(self, *args, **kwargs):
pass
[docs] def update_request(self, request, headers):
pass
[docs] def shutdown(self):
pass
[docs]class HTTPRequestUpdater(RequestUpdater):
"""RequestUpdater implementation based around an EasyClient.
Args:
ez_client: EasyClient to use for communication
shutdown_event: `threading.Event` to allow for timely shutdowns
logger: A logger
Keyword Args:
logger: A logger
max_attempts: Max number of unsuccessful updates before discarding the message
max_timeout: Maximum amount of time (seconds) to wait between update attempts
starting_timeout: Starting time to wait (seconds) between update attempts
"""
def __init__(self, ez_client, shutdown_event, **kwargs):
self.logger = kwargs.get("logger", logging.getLogger(__name__))
self._ez_client = ez_client
self._shutdown_event = shutdown_event
self.max_attempts = kwargs.get("max_attempts", -1)
self.max_timeout = kwargs.get("max_timeout", 30)
self.starting_timeout = kwargs.get("starting_timeout", 5)
# Tightly manage when we're in an 'error' state, aka beergarden is down
self.beergarden_error_condition = threading.Condition()
self.beergarden_down = False
self.logger.debug("Creating and starting connection poll thread")
self.connection_poll_thread = self._create_connection_poll_thread()
self.connection_poll_thread.start()
[docs] def shutdown(self):
self.logger.debug("Shutting down, about to wake any sleeping updater threads")
with self.beergarden_error_condition:
self.beergarden_error_condition.notify_all()
[docs] def update_request(self, request, headers):
"""Sends a Request update to beer-garden
Ephemeral requests do not get updated, so we simply skip them.
If beergarden appears to be down, it will wait for beergarden to come back
up before updating.
If this is the final attempt to update, we will attempt a known, good
request to give some information to the user. If this attempt fails
then we simply discard the message.
Args:
request: The request to update
headers: A dictionary of headers from the `PikaConsumer`
Returns:
None
Raises:
RepublishMessageException: The Request update failed (any reason)
"""
if request.is_ephemeral:
sys.stdout.flush()
return
with self.beergarden_error_condition:
self._wait_for_beergarden_if_down(request)
try:
if not self._should_be_final_attempt(headers):
self._wait_if_not_first_attempt(headers)
self._ez_client.update_request(
request.id,
status=request.status,
output=request.output,
error_class=request.error_class,
)
else:
self._ez_client.update_request(
request.id,
status="ERROR",
output="We tried to update the request, but it failed too many "
"times. Please check the plugin logs to figure out why the "
"request update failed. It is possible for this request to "
"have succeeded, but we cannot update beer-garden with that "
"information.",
error_class="BGGivesUpError",
)
except Exception as ex:
self._handle_request_update_failure(request, headers, ex)
finally:
sys.stdout.flush()
def _wait_if_not_first_attempt(self, headers):
if headers.get("retry_attempt", 0) > 0:
time_to_sleep = min(
headers.get("time_to_wait", self.starting_timeout), self.max_timeout
)
self._shutdown_event.wait(time_to_sleep)
def _handle_request_update_failure(self, request, headers, exc):
# If beergarden is down, we always want to try again
# Yes, even if it is the 'final_attempt'
if isinstance(exc, (RequestsConnectionError, RestConnectionError)):
self.beergarden_down = True
self.logger.error(
"Error updating request status: {0} exception: {1}".format(
request.id, exc
)
)
raise RepublishRequestException(request, headers)
elif isinstance(exc, TooLargeError):
self.logger.error(
"Error updating request {0} - the request exceeds the 16MB size "
"limitation. The status of this request will be marked as ERROR, but "
"it's possible the request actually completed successfully. If this "
"happens often please contact the plugin developer.".format(request.id)
)
request.status = "ERROR"
request.output = "Request size greater than 16MB"
request.error_class = BGGivesUpError.__name__
raise RepublishRequestException(request, headers)
elif isinstance(exc, RestClientError):
message = (
"Error updating request {0} and it is a client error. Probable "
"cause is that this request is already updated. In which case, "
"ignore this message. If request {0} did not complete, please "
"file an issue. Discarding request to avoid an infinite loop. "
"exception: {1}".format(request.id, exc)
)
self.logger.error(message)
raise DiscardMessageException(message)
# Time to discard the message because we've given up
elif self._should_be_final_attempt(headers):
message = (
"Could not update request {0} even with a known good status, "
"output and error_class. We have reached the final attempt and "
"will now discard the message. Attempted to process this "
"message {1} times".format(request.id, headers["retry_attempt"])
)
self.logger.error(message)
raise DiscardMessageException(message)
else:
self._update_retry_attempt_information(headers)
self.logger.exception(
"Error updating request (Attempt #{0}: request: {1} exception: "
"{2}".format(headers.get("retry_attempt", 0), request.id, exc)
)
raise RepublishRequestException(request, headers)
def _update_retry_attempt_information(self, headers):
headers["retry_attempt"] = headers.get("retry_attempt", 0) + 1
headers["time_to_wait"] = min(
headers.get("time_to_wait", self.starting_timeout // 2) * 2,
self.max_timeout,
)
def _should_be_final_attempt(self, headers):
if self.max_attempts <= 0:
return False
return self.max_attempts <= headers.get("retry_attempt", 0)
def _wait_for_beergarden_if_down(self, request):
if self.beergarden_down and not self._shutdown_event.is_set():
self.logger.warning(
"Currently unable to communicate with beergarden, about to wait "
"until connection is reestablished to update request %s",
request.id,
)
self.beergarden_error_condition.wait()
def _create_connection_poll_thread(self):
connection_poll_thread = threading.Thread(target=self._connection_poll)
connection_poll_thread.daemon = True
return connection_poll_thread
def _connection_poll(self):
"""Periodically attempt to re-connect to beer-garden"""
while not self._shutdown_event.wait(5):
try:
with self.beergarden_error_condition:
if self.beergarden_down:
try:
self._ez_client.get_version()
except Exception:
self.logger.debug("Beergarden reconnection attempt failure")
else:
self.logger.info(
"Beergarden connection reestablished, about to "
"notify any waiting requests"
)
self.beergarden_down = False
self.beergarden_error_condition.notify_all()
except Exception as ex:
self.logger.exception("Exception in connection poll thread: %s", ex)