Source code for brewtils.plugin

# -*- coding: utf-8 -*-

import json
import logging
import logging.config
import os
import sys
import threading
import warnings
from concurrent.futures import ThreadPoolExecutor

import six
from requests import ConnectionError as RequestsConnectionError

import brewtils
from brewtils.errors import (
from brewtils.log import DEFAULT_LOGGING_CONFIG
from brewtils.models import Instance, Request, System
from brewtils.request_consumer import RequestConsumer
from import EasyClient
from brewtils.schema_parser import SchemaParser

request_context = threading.local()

[docs]class Plugin(object): """A beer-garden Plugin. This class represents a beer-garden Plugin - a continuously-running process that can receive and process Requests. To work, a Plugin needs a Client instance - an instance of a class defining which Requests this plugin can accept and process. The easiest way to define a ``Client`` is by annotating a class with the ``@system`` decorator. When creating a Plugin you can pass certain keyword arguments to let the Plugin know how to communicate with the beer-garden instance. These are: - ``bg_host`` - ``bg_port`` - ``ssl_enabled`` - ``ca_cert`` - ``client_cert`` - ``bg_url_prefix`` A Plugin also needs some identifying data. You can either pass parameters to the Plugin or pass a fully defined System object (but not both). Note that some fields are optional:: Plugin( name="Test", version="1.0.0", instance_name="default", description="A Test", ) or:: the_system = System( name="Test", version="1.0.0", instance_name="default, description="A Test", ) Plugin(system=the_system) If passing parameters directly note that these fields are required: name Environment variable ``BG_NAME`` will be used if not specified version Environment variable ``BG_VERSION`` will be used if not specified instance_name Environment variable ``BG_INSTANCE_NAME`` will be used if not specified. 'default' will be used if not specified and loading from environment variable was unsuccessful And these fields are optional: - description (Will use docstring summary line from Client if unspecified) - icon_name - metadata - display_name Plugins service requests using a :py:class:`concurrent.futures.ThreadPoolExecutor`. The maximum number of threads available is controlled by the max_concurrent argument (the 'multithreaded' argument has been deprecated). .. warning:: The default value for ``max_concurrent`` is 1. This means that a Plugin that invokes a Command on itself in the course of processing a Request will deadlock! If you intend to do this, please set ``max_concurrent`` to a value that makes sense and be aware that Requests are processed in separate thread contexts! :param client: Instance of a class annotated with @system. :param str bg_host: Hostname of a beer-garden. :param int bg_port: Port beer-garden is listening on. :param bool ssl_enabled: Whether to use SSL for beer-garden communication. :param ca_cert: Certificate that issued the server certificate used by the beer-garden server. :param client_cert: Certificate used by the server making the connection to beer-garden. :param system: The system definition. :param name: The system name. :param description: The system description. :param version: The system version. :param icon_name: The system icon name. :param str instance_name: The name of the instance. :param logger: A logger that will be used by the Plugin. :type logger: :py:class:`logging.Logger`. :param parser: The parser to use when communicating with beer-garden. :type parser: :py:class:`brewtils.schema_parser.SchemaParser`. :param bool multithreaded: DEPRECATED Process requests in a separate thread. :param int worker_shutdown_timeout: Time to wait during shutdown to finish processing. :param dict metadata: Metadata specific to this plugin. :param int max_concurrent: Maximum number of requests to process concurrently. :param str bg_url_prefix: URL Prefix beer-garden is on. :param str display_name: The display name to use for the system. :param int max_attempts: Number of times to attempt updating the request before giving up (default -1 aka never). :param int max_timeout: Maximum amount of time to wait before retrying to update a request. :param int starting_timeout: Initial time to wait before the first retry. :param int mq_max_attempts: Number of times to attempt reconnection to message queue before giving up (default -1 aka never). :param int mq_max_timeout: Maximum amount of time to wait before retrying to connect to message queue. :param int mq_starting_timeout: Initial time to wait before the first message queue connection retry. :param int max_instances: Max number of instances allowed for the system. :param bool ca_verify: Verify server certificate when making a request. :param str username: The username for Beergarden authentication :param str password: The password for Beergarden authentication :param access_token: Access token for Beergarden authentication :param refresh_token: Refresh token for Beergarden authentication """ def __init__( self, client, bg_host=None, bg_port=None, ssl_enabled=None, ca_cert=None, client_cert=None, system=None, name=None, description=None, version=None, icon_name=None, instance_name=None, logger=None, parser=None, multithreaded=None, metadata=None, max_concurrent=None, bg_url_prefix=None, **kwargs ): # If a logger is specified or the logging module already has additional # handlers then we assume that logging has already been configured if logger or len(logging.getLogger(__name__).root.handlers) > 0: self.logger = logger or logging.getLogger(__name__) self._custom_logger = True else: logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) self.logger = logging.getLogger(__name__) self._custom_logger = False connection_parameters = brewtils.get_connection_info( bg_host=bg_host, bg_port=bg_port, ssl_enabled=ssl_enabled, ca_cert=ca_cert, client_cert=client_cert, url_prefix=bg_url_prefix or kwargs.get("url_prefix", None), ca_verify=kwargs.get("ca_verify", None), username=kwargs.get("username", None), password=kwargs.get("password", None), client_timeout=kwargs.get("client_timeout", None), ) self.bg_host = connection_parameters["bg_host"] self.bg_port = connection_parameters["bg_port"] self.ssl_enabled = connection_parameters["ssl_enabled"] self.ca_cert = connection_parameters["ca_cert"] self.client_cert = connection_parameters["client_cert"] self.bg_url_prefix = connection_parameters["url_prefix"] self.ca_verify = connection_parameters["ca_verify"] self.max_attempts = kwargs.get("max_attempts", -1) self.max_timeout = kwargs.get("max_timeout", 30) self.starting_timeout = kwargs.get("starting_timeout", 5) self._mq_max_attempts = kwargs.get("mq_max_attempts", -1) self._mq_max_timeout = kwargs.get("mq_max_timeout", 30) self._mq_starting_timeout = kwargs.get("mq_starting_timeout", 5) self._mq_retry_attempt = 0 self._mq_timeout = self._mq_starting_timeout self.max_concurrent = self._setup_max_concurrent(multithreaded, max_concurrent) self.instance_name = instance_name or os.environ.get( "BG_INSTANCE_NAME", "default" ) self.metadata = metadata or {} self.instance = None self.queue_connection_params = None self.admin_consumer = None self.request_consumer = None self.connection_poll_thread = None self.client = client self.shutdown_event = threading.Event() self.parser = parser or SchemaParser() self.system = self._setup_system( client, self.instance_name, system, name, description, version, icon_name, self.metadata, kwargs.get("display_name", None), kwargs.get("max_instances", None), ) self.unique_name = "%s[%s]-%s" % (, self.instance_name, self.system.version, ) # Tightly manage when we're in an 'error' state, aka Brew-view is down self.brew_view_error_condition = threading.Condition() self.brew_view_down = False self.pool = ThreadPoolExecutor(max_workers=self.max_concurrent) self.admin_pool = ThreadPoolExecutor(max_workers=1) self.bm_client = EasyClient( logger=self.logger, parser=self.parser, **connection_parameters )
[docs] def run(self): # Let Beergarden know about our system and instance self._initialize() self.logger.debug("Creating and starting admin queue consumer") self.admin_consumer = self._create_admin_consumer() self.admin_consumer.start() self.logger.debug("Creating and starting request queue consumer") self.request_consumer = self._create_standard_consumer() self.request_consumer.start() self.logger.debug("Creating and starting connection poll thread") self.connection_poll_thread = self._create_connection_poll_thread() self.connection_poll_thread.start()"Plugin %s has started", self.unique_name) try: while not self.shutdown_event.wait(0.1): self._check_connection_poll_thread() self._check_consumers() except KeyboardInterrupt: self.logger.debug("Received KeyboardInterrupt - shutting down") except Exception as ex: self.logger.error("Event loop terminated unexpectedly - shutting down") self.logger.exception(ex) self.logger.debug("About to shut down plugin %s", self.unique_name) self._shutdown()"Plugin %s has terminated", self.unique_name)
[docs] def process_message(self, target, request, headers): """Process a message. Intended to be run on an Executor. :param target: The object to invoke received commands on. (self or self.client) :param request: The parsed Request object :param headers: Dictionary of headers from the `brewtils.request_consumer.RequestConsumer` :return: None """ request.status = "IN_PROGRESS" self._update_request(request, headers) try: # Set request context so this request will be the parent of any # generated requests and update status We also need the host/port of # the current plugin. We currently don't support parent/child # requests across different servers. request_context.current_request = request request_context.bg_host = self.bg_host request_context.bg_port = self.bg_port output = self._invoke_command(target, request) except Exception as ex: self.logger.log( getattr(ex, "_bg_error_log_level", logging.ERROR), "Plugin %s raised an exception while processing request %s: %s", self.unique_name, str(request), ex, exc_info=not getattr(ex, "_bg_suppress_stacktrace", False), ) request.status = "ERROR" request.output = self._format_error_output(request, ex) request.error_class = type(ex).__name__ else: request.status = "SUCCESS" request.output = self._format_output(output) self._update_request(request, headers)
[docs] def process_request_message(self, message, headers): """Processes a message from a RequestConsumer :param message: A valid string representation of a `brewtils.models.Request` :param headers: A dictionary of headers from the `brewtils.request_consumer.RequestConsumer` :return: A `concurrent.futures.Future` """ request = self._pre_process(message) # This message has already been processed, all it needs to do is update if request.status in Request.COMPLETED_STATUSES: return self.pool.submit(self._update_request, request, headers) else: return self.pool.submit(self.process_message, self.client, request, headers)
[docs] def process_admin_message(self, message, headers): # Admin requests won't have a system, so don't verify it request = self._pre_process(message, verify_system=False) return self.admin_pool.submit(self.process_message, self, request, headers)
def _pre_process(self, message, verify_system=True): if self.shutdown_event.is_set(): raise RequestProcessingError( "Unable to process message - currently shutting down" ) try: request = self.parser.parse_request(message, from_string=True) except Exception as ex: self.logger.exception( "Unable to parse message body: {0}. Exception: {1}".format(message, ex) ) raise DiscardMessageException("Error parsing message body") if ( verify_system and request.command_type and request.command_type.upper() != "EPHEMERAL" and request.system.upper() != ): raise DiscardMessageException( "Received message for a different system {0}".format( request.system.upper() ) ) return request def _initialize(self): self.logger.debug("Initializing plugin %s", self.unique_name) # TODO: We should use self.bm_client.upsert_system once it is supported existing_system = self.bm_client.find_unique_system(, version=self.system.version ) if existing_system: if not existing_system.has_instance(self.instance_name): if len(existing_system.instances) < existing_system.max_instances: existing_system.instances.append(Instance(name=self.instance_name)) self.bm_client.create_system(existing_system) else: raise PluginValidationError( 'Unable to create plugin with instance name "%s": ' 'System "%s[%s]" has an instance limit of %d and ' "existing instances %s" % ( self.instance_name,, existing_system.version, existing_system.max_instances, ", ".join(existing_system.instance_names), ) ) # We always update in case the metadata has changed. self.system = self.bm_client.update_system(, new_commands=self.system.commands, metadata=self.system.metadata, description=self.system.description, display_name=self.system.display_name, icon_name=self.system.icon_name, ) else: self.system = self.bm_client.create_system(self.system) # Sanity check to make sure an instance with this name was registered if self.system.has_instance(self.instance_name): instance_id = self.system.get_instance(self.instance_name).id else: raise PluginValidationError( 'Unable to find registered instance with name "%s"' % self.instance_name ) self.instance = self.bm_client.initialize_instance(instance_id) self.queue_connection_params = self.instance.queue_info.get("connection", None) if self.queue_connection_params and self.queue_connection_params.get( "ssl", None ): self.queue_connection_params["ssl"].update( { "ca_cert": self.ca_cert, "ca_verify": self.ca_verify, "client_cert": self.client_cert, } ) self.logger.debug("Plugin %s is initialized", self.unique_name) def _shutdown(self): self.shutdown_event.set() self.logger.debug("About to stop message consuming") self.request_consumer.stop_consuming() self.admin_consumer.stop_consuming() self.logger.debug("About to wake sleeping request processing threads") with self.brew_view_error_condition: self.brew_view_error_condition.notify_all() self.logger.debug("Shutting down request processing pool") self.pool.shutdown(wait=True) self.logger.debug("Shutting down admin processing pool") self.admin_pool.shutdown(wait=True) self.logger.debug("Attempting to stop request queue consumer") self.request_consumer.stop() self.request_consumer.join() self.logger.debug("Attempting to stop admin queue consumer") self.admin_consumer.stop() self.admin_consumer.join() try: self.bm_client.update_instance_status(, "STOPPED") except Exception: self.logger.warning( "Unable to notify Beer-garden that this plugin is STOPPED, so this " "plugin's status may be incorrect in Beer-garden" ) self.logger.debug("Successfully shutdown plugin {0}".format(self.unique_name)) def _create_standard_consumer(self): return RequestConsumer( thread_name="Request Consumer", connection_info=self.queue_connection_params, amqp_url=self.instance.queue_info.get("url", None), queue_name=self.instance.queue_info["request"]["name"], on_message_callback=self.process_request_message, panic_event=self.shutdown_event, max_concurrent=self.max_concurrent, ) def _create_admin_consumer(self): return RequestConsumer( thread_name="Admin Consumer", connection_info=self.queue_connection_params, amqp_url=self.instance.queue_info.get("url", None), queue_name=self.instance.queue_info["admin"]["name"], on_message_callback=self.process_admin_message, panic_event=self.shutdown_event, max_concurrent=1, logger=logging.getLogger("brewtils.admin_consumer"), ) def _create_connection_poll_thread(self): connection_poll_thread = threading.Thread(target=self._connection_poll) connection_poll_thread.daemon = True return connection_poll_thread def _invoke_command(self, target, request): """Invoke the function named in request.command. :param target: The object to search for the function implementation. Will be self or self.client. :param request: The request to process :raise RequestProcessingError: The specified target does not define a callable implementation of request.command :return: The output of the function invocation """ if not callable(getattr(target, request.command, None)): raise RequestProcessingError( "Could not find an implementation of command '%s'" % request.command ) # It's kinda weird that we need to add the object arg only if we're # trying to call a function on self. In both cases the function object # is bound... think it has something to do with our decorators args = [self] if target is self else [] return getattr(target, request.command)(*args, **request.parameters) def _update_request(self, request, headers): """Sends a Request update to beer-garden Ephemeral requests do not get updated, so we simply skip them. If brew-view appears to be down, it will wait for brew-view to come back up before updating. If this is the final attempt to update, we will attempt a known, good request to give some information to the user. If this attempt fails then we simply discard the message :param request: The request to update :param headers: A dictionary of headers from `brewtils.request_consumer.RequestConsumer` :raise RepublishMessageException: The Request update failed (any reason) :return: None """ if request.is_ephemeral: sys.stdout.flush() return with self.brew_view_error_condition: self._wait_for_brew_view_if_down(request) try: if not self._should_be_final_attempt(headers): self._wait_if_not_first_attempt(headers) self.bm_client.update_request(, status=request.status, output=request.output, error_class=request.error_class, ) else: self.bm_client.update_request(, status="ERROR", output="We tried to update the request, but it failed " "too many times. Please check the plugin logs " "to figure out why the request update failed. " "It is possible for this request to have " "succeeded, but we cannot update beer-garden " "with that information.", error_class=BGGivesUpError.__name__, ) except Exception as ex: self._handle_request_update_failure(request, headers, ex) finally: sys.stdout.flush() def _wait_if_not_first_attempt(self, headers): if headers.get("retry_attempt", 0) > 0: time_to_sleep = min( headers.get("time_to_wait", self.starting_timeout), self.max_timeout ) self.shutdown_event.wait(time_to_sleep) def _handle_request_update_failure(self, request, headers, exc): # If brew-view is down, we always want to try again # Yes, even if it is the 'final_attempt' if isinstance(exc, (RequestsConnectionError, RestConnectionError)): self.brew_view_down = True self.logger.error( "Error updating request status: {0} exception: {1}".format(, exc ) ) raise RepublishRequestException(request, headers) elif isinstance(exc, TooLargeError): self.logger.error( "Error updating request {0} - the request exceeds the 16MB size " "limitation. The status of this request will be marked as ERROR, but " "it's possible the request actually completed successfully. If this " "happens often please contact the plugin developer.".format( ) raise RepublishRequestException( Request(, status="ERROR", output="Request size greater than 16MB", error_class=BGGivesUpError.__name__, ), headers, ) elif isinstance(exc, RestClientError): message = ( "Error updating request {0} and it is a client error. Probable " "cause is that this request is already updated. In which case, " "ignore this message. If request {0} did not complete, please " "file an issue. Discarding request to avoid an infinte loop. " "exception: {1}".format(, exc) ) self.logger.error(message) raise DiscardMessageException(message) # Time to discard the message because we've given up elif self._should_be_final_attempt(headers): message = ( "Could not update request {0} even with a known good status, " "output and error_class. We have reached the final attempt and " "will now discard the message. Attempted to process this " "message {1} times".format(, headers["retry_attempt"]) ) self.logger.error(message) raise DiscardMessageException(message) else: self._update_retry_attempt_information(headers) self.logger.exception( "Error updating request (Attempt #{0}: request: {1} exception: " "{2}".format(headers.get("retry_attempt", 0),, exc) ) raise RepublishRequestException(request, headers) def _update_retry_attempt_information(self, headers): headers["retry_attempt"] = headers.get("retry_attempt", 0) + 1 headers["time_to_wait"] = min( headers.get("time_to_wait", self.starting_timeout // 2) * 2, self.max_timeout, ) def _should_be_final_attempt(self, headers): if self.max_attempts <= 0: return False return self.max_attempts <= headers.get("retry_attempt", 0) def _wait_for_brew_view_if_down(self, request): if self.brew_view_down and not self.shutdown_event.is_set(): self.logger.warning( "Currently unable to communicate with Brew-view, about to wait " "until connection is reestablished to update request %s",, ) self.brew_view_error_condition.wait() def _check_connection_poll_thread(self): """Ensure the connection poll thread is alive""" if not self.connection_poll_thread.isAlive(): self.logger.warning( "Looks like connection poll thread has died - attempting to restart" ) self.connection_poll_thread = self._create_connection_poll_thread() self.connection_poll_thread.start() def _check_consumers(self): """Ensure the RequestConsumers are both alive""" if self.admin_consumer.is_connected() and self.request_consumer.is_connected(): if self._mq_retry_attempt != 0:"Admin and request consumer connections OK") self._mq_retry_attempt = 0 self._mq_timeout = self._mq_starting_timeout else: if 0 < self._mq_max_attempts < self._mq_retry_attempt: self.logger.warning("Max consumer connection failures, shutting down") self.shutdown_event.set() return if not self.admin_consumer.is_connected(): self.logger.warning("Looks like admin consumer has died") if not self.request_consumer.is_connected(): self.logger.warning("Looks like request consumer has died") self.logger.warning("Waiting %i seconds before restart", self._mq_timeout) self.shutdown_event.wait(self._mq_timeout) if not self.admin_consumer.is_connected(): self.admin_consumer = self._create_admin_consumer() self.admin_consumer.start() if not self.request_consumer.is_connected(): self.request_consumer = self._create_standard_consumer() self.request_consumer.start() self._mq_timeout = min(self._mq_timeout * 2, self._mq_max_timeout) self._mq_retry_attempt += 1 def _start(self, request): """Handle start message by marking this instance as running. :param request: The start message :return: Success output message """ self.instance = self.bm_client.update_instance_status(, "RUNNING" ) return "Successfully started plugin" def _stop(self, request): """Handle stop message by marking this instance as stopped. :param request: The stop message :return: Success output message """ self.shutdown_event.set() return "Successfully stopped plugin" def _status(self, request): """Handle status message by sending a heartbeat. :param request: The status message :return: None """ with self.brew_view_error_condition: if not self.brew_view_down: try: self.bm_client.instance_heartbeat( except (RequestsConnectionError, RestConnectionError): self.brew_view_down = True raise def _setup_max_concurrent(self, multithreaded, max_concurrent): """Determine correct max_concurrent value. Will be unnecessary when multithreaded flag is removed.""" if multithreaded is not None: warnings.warn( "Keyword argument 'multithreaded' is deprecated and will be " "removed in version 3.0, please use 'max_concurrent' instead.", DeprecationWarning, stacklevel=2, ) # Both multithreaded and max_concurrent kwargs explicitly set # check for mutually exclusive settings if max_concurrent is not None: if multithreaded is True and max_concurrent == 1: self.logger.warning( "Plugin created with multithreaded=True and " "max_concurrent=1, ignoring 'multithreaded' argument" ) elif multithreaded is False and max_concurrent > 1: self.logger.warning( "Plugin created with multithreaded=False and " "max_concurrent>1, ignoring 'multithreaded' argument" ) return max_concurrent else: return 5 if multithreaded else 1 else: if max_concurrent is None: warnings.warn( "Heads up - in 3.0 the default plugin behavior is changing " "from handling requests one at a time to handling multiple " "at once. If this plugin needs to maintain the old " "behavior just set max_concurrent=1 when creating the " "plugin.", PendingDeprecationWarning, stacklevel=2, ) return 1 return max_concurrent def _setup_system( self, client, inst_name, system, name, description, version, icon_name, metadata, display_name, max_instances, ): if system: if ( name or description or version or icon_name or display_name or max_instances ): raise ValidationError( "Sorry, you can't specify a system as well as system " "creation helper keywords (name, description, version, " "max_instances, display_name, and icon_name)" ) if client._bg_name or client._bg_version: raise ValidationError( "Sorry, you can't specify a system as well as system " "info in the @system decorator (bg_name, bg_version)" ) if not system.instances: raise ValidationError( "Explicit system definition requires explicit instance " "definition (use instances=[Instance(name='default')] for " "default behavior)" ) if not system.max_instances: system.max_instances = len(system.instances) else: name = name or os.environ.get("BG_NAME", None) or client._bg_name version = ( version or os.environ.get("BG_VERSION", None) or client._bg_version ) if client.__doc__ and not description: description = self.client.__doc__.split("\n")[0] system = System( name=name, description=description, version=version, icon_name=icon_name, commands=client._commands, max_instances=max_instances or 1, instances=[Instance(name=inst_name)], metadata=metadata, display_name=display_name, ) return system def _connection_poll(self): """Periodically attempt to re-connect to beer-garden""" while not self.shutdown_event.wait(5): with self.brew_view_error_condition: if self.brew_view_down: try: self.bm_client.get_version() except Exception: self.logger.debug("Attempt to reconnect to Brew-view failed") else: "Brew-view connection reestablished, about to " "notify any waiting requests" ) self.brew_view_down = False self.brew_view_error_condition.notify_all() @staticmethod def _format_error_output(request, exc): if request.is_json: return parse_exception_as_json(exc) else: return str(exc) @staticmethod def _format_output(output): """Formats output from Plugins to prevent validation errors""" if isinstance(output, six.string_types): return output try: return json.dumps(output) except (TypeError, ValueError): return str(output)
# Alias old name PluginBase = Plugin
[docs]class RemotePlugin(Plugin): pass