Source code for brewtils.queues

# -*- coding: utf-8 -*-

import ssl as pyssl

from pika import ConnectionParameters, PlainCredentials, SSLOptions
from pika import __version__ as pika_version

PIKA_ONE = pika_version.startswith("1.")

[docs]class PikaClient(object): """Base class for connecting to RabbitMQ using Pika Args: host: RabbitMQ host port: RabbitMQ port user: RabbitMQ user password: RabbitMQ password connection_attempts: Maximum number of retry attempts heartbeat: Time between RabbitMQ heartbeats heartbeat_interval: DEPRECATED, use heartbeat virtual_host: RabbitMQ virtual host exchange: Default exchange that will be used ssl: SSL Options blocked_connection_timeout: If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection.Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection (e.g., on_close_callback or ConnectionClosed exception) with `reason_code` of `InternalCloseReasons.BLOCKED_CONNECTION_TIMEOUT`. """ def __init__( self, host="localhost", port=5672, user="guest", password="guest", connection_attempts=3, heartbeat_interval=3600, virtual_host="/", exchange="beer_garden", ssl=None, blocked_connection_timeout=None, **kwargs ): self._host = host self._port = port self._user = user self._password = password self._connection_attempts = connection_attempts self._heartbeat = kwargs.get("heartbeat", heartbeat_interval) self._blocked_connection_timeout = blocked_connection_timeout self._virtual_host = virtual_host self._exchange = exchange ssl = ssl or {} self._ssl_enabled = ssl.get("enabled", False) if not self._ssl_enabled: self._ssl_options = None elif PIKA_ONE: ssl_context = pyssl.create_default_context(cafile=ssl.get("ca_cert", None)) if ssl.get("ca_verify"): ssl_context.verify_mode = pyssl.CERT_REQUIRED else: ssl_context.check_hostname = False ssl_context.verify_mode = pyssl.CERT_NONE self._ssl_options = SSLOptions(ssl_context, server_hostname=self._host) else: mode = pyssl.CERT_REQUIRED if ssl.get("ca_verify") else pyssl.CERT_NONE self._ssl_options = SSLOptions( cafile=ssl.get("ca_cert", None), verify_mode=mode, server_hostname=self._host, ) # Save the 'normal' params so they don't need to be reconstructed self._conn_params = self.connection_parameters() @property def connection_url(self): """str: Connection URL for this client's connection information""" virtual_host = self._conn_params.virtual_host if virtual_host == "/": virtual_host = "" return "amqp%s://%s:%s@%s:%s/%s" % ( "s" if self._ssl_enabled else "", self._conn_params.credentials.username, self._conn_params.credentials.password,, self._conn_params.port, virtual_host, )
[docs] def connection_parameters(self, **kwargs): """Get ``ConnectionParameters`` associated with this client Will construct a ``ConnectionParameters`` object using parameters passed at initialization as defaults. Any parameters passed in kwargs will override initialization parameters. Args: **kwargs: Overrides for specific parameters Returns: :obj:`pika.ConnectionParameters`: ConnectionParameters object """ credentials = PlainCredentials( username=kwargs.get("user", self._user), password=kwargs.get("password", self._password), ) conn_params = { "host": kwargs.get("host", self._host), "port": kwargs.get("port", self._port), "ssl_options": kwargs.get("ssl_options", self._ssl_options), "virtual_host": kwargs.get("virtual_host", self._virtual_host), "connection_attempts": kwargs.get( "connection_attempts", self._connection_attempts ), "heartbeat": kwargs.get( "heartbeat", kwargs.get("heartbeat_interval", self._heartbeat) ), "blocked_connection_timeout": kwargs.get( "blocked_connection_timeout", self._blocked_connection_timeout ), "credentials": credentials, } if not PIKA_ONE: conn_params["ssl"] = kwargs.get("ssl_enabled", self._ssl_enabled) return ConnectionParameters(**conn_params)