Source code for brewtils.request_consumer

# -*- coding: utf-8 -*-
import logging
import threading
from functools import partial

from pika import BlockingConnection, URLParameters, BasicProperties, SelectConnection

from brewtils.errors import DiscardMessageException, RepublishRequestException
from brewtils.queues import PikaClient, PIKA_ONE
from brewtils.schema_parser import SchemaParser


[docs]class RequestConsumer(threading.Thread): """RabbitMQ message consumer This consumer is designed to be fault-tolerant - if RabbitMQ closes the connection the consumer will attempt to reopen it. There are limited reasons why the connection may be closed from the broker side and usually indicates permission related issues or socket timeouts. Unexpected channel closures can indicate a problem with a command that was issued. Args: amqp_url: (str) The AMQP url to connect to queue_name: (str) The name of the queue to connect to on_message_callback (func): function called to invoke message processing. Must return a Future. panic_event (threading.Event): Event to be set on a catastrophic failure logger (logging.Logger): A configured Logger thread_name (str): Name to use for this thread max_concurrent: (int) Maximum requests to process concurrently """ def __init__( self, amqp_url=None, queue_name=None, on_message_callback=None, panic_event=None, logger=None, thread_name=None, **kwargs ): self._connection = None self._channel = None self._consumer_tag = None self._queue_name = queue_name self._on_message_callback = on_message_callback self._panic_event = panic_event self._max_concurrent = kwargs.get("max_concurrent", 1) self.logger = logger or logging.getLogger(__name__) if "connection_info" in kwargs: params = kwargs["connection_info"] # Default to one attempt as the Plugin implements its own retry logic params["connection_attempts"] = params.get("connection_attempts", 1) self._connection_parameters = PikaClient(**params).connection_parameters() else: self._connection_parameters = URLParameters(amqp_url) super(RequestConsumer, self).__init__(name=thread_name)
[docs] def run(self): """Run the consumer Creates a connection to RabbitMQ and starts the IOLoop. The IOLoop will block and allow the SelectConnection to operate. This means that to stop the RequestConsumer we just need to stop the IOLoop. Returns: None """ self._connection = self.open_connection() self._connection.ioloop.start()
[docs] def stop(self): """Cleanly shutdown It's a good idea to call stop_consuming before this to prevent new messages from being processed during shutdown. This sets the shutdown_event to let callbacks know that this is an orderly (requested) shutdown. It then schedules a channel close on the IOLoop - the channel's on_close callback will close the connection, and the connection's on_close callback will terminate the IOLoop which will end the RequestConsumer. Returns: None """ self.logger.debug("Stopping request consumer") self._connection.ioloop.add_callback_threadsafe(partial(self._connection.close))
[docs] def is_connected(self): """Determine if the underlying connection is open Returns: True if the connection exists and is open, False otherwise """ return self._connection and self._connection.is_open
[docs] def on_message(self, channel, basic_deliver, properties, body): """Invoked when a message is delivered from the queueing service Invoked by pika when a message is delivered from RabbitMQ. The channel is passed for your convenience. The basic_deliver object that is passed in carries the exchange, routing key, delivery tag and a redelivered flag for the message. the properties passed in is an instance of BasicProperties with the message properties and the body is the message that was sent. Args: channel (pika.channel.Channel): The channel object basic_deliver (pika.Spec.Basic.Deliver): basic_deliver method properties (pika.Spec.BasicProperties): Message properties body (bytes): The message body """ self.logger.debug( "Received message #%s from %s on channel %s: %s", basic_deliver.delivery_tag, properties.app_id, channel.channel_number, body, ) # Pika gives us bytes, but we want a string to be ok too try: body = body.decode() except AttributeError: pass try: future = self._on_message_callback(body, properties.headers) callback = partial(self.on_message_callback_complete, basic_deliver) future.add_done_callback(callback) except DiscardMessageException: self.logger.debug( "Nacking message %s, not attempting to requeue", basic_deliver.delivery_tag, ) self._channel.basic_nack(basic_deliver.delivery_tag, requeue=False) except Exception as ex: self.logger.exception( "Exception while trying to schedule message %s, about to nack " "and requeue: %s", basic_deliver.delivery_tag, ex, ) self._channel.basic_nack(basic_deliver.delivery_tag, requeue=True)
[docs] def on_message_callback_complete(self, basic_deliver, future): """Invoked when the future returned by _on_message_callback completes. This method will be invoked from the threadpool context. It's only purpose is to schedule the final processing steps to take place on the connection's ioloop. Args: basic_deliver: future: Completed future Returns: None """ self._connection.ioloop.add_callback_threadsafe( partial(self.finish_message, basic_deliver, future) )
[docs] def finish_message(self, basic_deliver, future): """Finish processing a message This should be invoked as the final part of message processing. It's responsible for acking / nacking messages back to the broker. The main complexity here depends on whether the request processing future has an exception: - If there is no exception it acks the message - If there is an exception: - If the exception is an instance of DiscardMessageException it nacks the message and does not requeue it - If the exception is an instance of RepublishRequestException it will construct an entirely new BlockingConnection, use that to publish a new message, and then ack the original message - If the exception is not an instance of either the panic_event is set and the consumer will self-destruct Also, if there's ever an error acking a message the panic_event is set and the consumer will self-destruct. Args: basic_deliver: future: Completed future Returns: None """ delivery_tag = basic_deliver.delivery_tag if not future.exception(): try: self.logger.debug("Acking message %s", delivery_tag) self._channel.basic_ack(delivery_tag) except Exception as ex: self.logger.exception( "Error acking message %s, about to shut down: %s", delivery_tag, ex ) self._panic_event.set() else: real_ex = future.exception() if isinstance(real_ex, RepublishRequestException): try: with BlockingConnection(self._connection_parameters) as c: headers = real_ex.headers headers.update({"request_id": real_ex.request.id}) props = BasicProperties( app_id="beer-garden", content_type="text/plain", headers=headers, priority=1, ) c.channel().basic_publish( exchange=basic_deliver.exchange, properties=props, routing_key=basic_deliver.routing_key, body=SchemaParser.serialize_request(real_ex.request), ) self._channel.basic_ack(delivery_tag) except Exception as ex: self.logger.exception( "Error republishing message %s, about to shut down: %s", delivery_tag, ex, ) self._panic_event.set() elif isinstance(real_ex, DiscardMessageException): self.logger.info( "Nacking message %s, not attempting to requeue", delivery_tag ) self._channel.basic_nack(delivery_tag, requeue=False) else: # If request processing throws anything else we terminate self.logger.exception( "Unexpected exception during request %s processing, about " "to shut down: %s", delivery_tag, real_ex, exc_info=False, ) self._panic_event.set()
[docs] def open_connection(self): """Opens a connection to RabbitMQ This method immediately returns the connection object. However, whether the connection was successful is not know until a callback is invoked (either on_open_callback or on_open_error_callback). Returns: The SelectConnection object """ return SelectConnection( parameters=self._connection_parameters, on_open_callback=self.on_connection_open, on_close_callback=self.on_connection_closed, on_open_error_callback=self.on_connection_closed, )
[docs] def on_connection_open(self, connection): """Connection open success callback This method is called by pika once the connection to RabbitMQ has been established. The only thing this actually does is call the open_channel method. Args: connection: The connection object Returns: None """ self.logger.debug("Connection opened: %s", connection) self.open_channel()
[docs] def on_connection_closed(self, connection, *args): """Connection closed callback This method is invoked by pika when the connection to RabbitMQ is closed. If the connection is closed we terminate its IOLoop to stop the RequestConsumer. In the case of an unexpected connection closure we'll wait 5 seconds before terminating with the expectation that the plugin will attempt to restart the consumer once it's dead. Args: connection: The connection args: Tuple of arguments describing why the connection closed pika < 1: reply_code: Numeric code indicating close reason reply_text: String describing close reason pika >= 1: exc: Exception describing close Returns: None """ self.logger.debug("Connection %s closed: %s", connection, args) self._connection.ioloop.stop()
[docs] def open_channel(self): """Open a channel""" self.logger.debug("Opening a new channel") self._connection.channel(on_open_callback=self.on_channel_open)
[docs] def on_channel_open(self, channel): """Channel open success callback This will add a close callback (on_channel_closed) the channel and will call start_consuming to begin receiving messages. Args: channel: The opened channel object Returns: None """ self.logger.debug("Channel opened: %s", channel) self._channel = channel self._channel.add_on_close_callback(self.on_channel_closed) self.start_consuming()
[docs] def on_channel_closed(self, channel, *args): """Channel closed callback This method is invoked by pika when the channel is closed. Channels are usually closed as a result of something that violates the protocol, such as attempting to re-declare an exchange or queue with different parameters. This indicates that something has gone wrong, so just close the connection (if it's still open) to reset. Args: channel: The channel args: Tuple of arguments describing why the channel closed pika < 1: reply_code: Numeric code indicating close reason reply_text: String describing close reason pika >= 1: exc: Exception describing close Returns: None """ self.logger.debug("Channel %i closed: %s", channel, args) if self._connection.is_open: self._connection.close()
[docs] def start_consuming(self): """Begin consuming messages The RabbitMQ prefetch is set to the maximum number of concurrent consumers. This ensures that messages remain in RabbitMQ until a consuming thread is available to process them. An on_cancel_callback is registered so that the consumer is notified if it is canceled by the broker. Returns: None """ self.logger.debug("Issuing consumer related RPC commands") self._channel.basic_qos(prefetch_count=self._max_concurrent) self._channel.add_on_cancel_callback(self.on_consumer_cancelled) consume_kwargs = {"queue": self._queue_name} if PIKA_ONE: consume_kwargs["on_message_callback"] = self.on_message else: consume_kwargs["consumer_callback"] = self.on_message self._consumer_tag = self._channel.basic_consume(**consume_kwargs)
[docs] def stop_consuming(self): """Stop consuming messages Sends a Basic.Cancel command to the broker, which causes the broker to stop sending the consumer messages. Returns: None """ if self._channel: self.logger.debug("Stopping message consuming on channel %i", self._channel) self._connection.ioloop.add_callback_threadsafe( partial( self._channel.basic_cancel, consumer_tag=self._consumer_tag, callback=lambda *args: None, ) )
[docs] def on_consumer_cancelled(self, method_frame): """Consumer cancelled callback This is only invoked if the consumer is cancelled by the broker. Since that effectively ends the request consuming we close the channel to start the process of terminating the RequestConsumer. Args: method_frame (pika.frame.Method): The Basic.Cancel frame Returns: None """ self.logger.debug("Consumer was cancelled: %r", method_frame) if self._channel: self._connection.close()