# -*- coding: utf-8 -*-
import functools
import inspect
import json
import os
import sys
import types
from io import open
import requests
import six
import wrapt
try:
from lark import ParseError
except ImportError:
from lark.common import ParseError
from brewtils.choices import parse
from brewtils.errors import PluginParamError, _deprecate
from brewtils.models import Command, Parameter, Choices
if sys.version_info.major == 2:
from funcsigs import signature, Parameter as InspectParameter
else:
from inspect import signature, Parameter as InspectParameter
__all__ = [
"command",
"parameter",
"parameters",
"system",
]
# The wrapt module has a cool feature where you can disable wrapping a decorated function,
# instead just using the original function. This is pretty much exactly what we want - we
# aren't using decorators for their 'real' purpose of wrapping a function, we just want to add
# some metadata to the function object. So we'll disable the wrapping normally, but we need to
# test that enabling the wrapping would work.
_wrap_functions = False
[docs]def system(cls=None, bg_name=None, bg_version=None):
"""Class decorator that marks a class as a beer-garden System
Creates some properties on the class:
* ``_bg_name``: an optional system name
* ``_bg_version``: an optional system version
* ``_bg_commands``: holds all registered commands
* ``_current_request``: Reference to the currently executing request
Args:
cls: The class to decorated
bg_name: Optional plugin name
bg_version: Optional plugin version
Returns:
The decorated class
"""
if cls is None:
return functools.partial(system, bg_name=bg_name, bg_version=bg_version)
import brewtils.plugin
commands = []
for method_name in dir(cls):
method = getattr(cls, method_name)
method_command = getattr(method, "_command", None)
if method_command:
commands.append(method_command)
cls._bg_name = bg_name
cls._bg_version = bg_version
cls._bg_commands = commands
cls._current_request = property(
lambda self: brewtils.plugin.request_context.current_request
)
return cls
[docs]def command(
_wrapped=None,
command_type="ACTION",
output_type="STRING",
hidden=False,
schema=None,
form=None,
template=None,
icon_name=None,
description=None,
):
"""Decorator that marks a function as a beer-garden command
For example:
.. code-block:: python
@command(output_type='JSON')
def echo_json(self, message):
return message
Args:
_wrapped: The function to decorate. This is handled as a positional argument and
shouldn't be explicitly set.
command_type: The command type. Valid options are Command.COMMAND_TYPES.
output_type: The output type. Valid options are Command.OUTPUT_TYPES.
schema: A custom schema definition.
form: A custom form definition.
template: A custom template definition.
icon_name: The icon name. Should be either a FontAwesome or a Glyphicon name.
hidden: Status whether command is visible on the user interface.
description: The command description. Will override the function's docstring.
Returns:
The decorated function
"""
if _wrapped is None:
return functools.partial(
command,
command_type=command_type,
output_type=output_type,
schema=schema,
form=form,
hidden=hidden,
template=template,
icon_name=icon_name,
description=description,
)
generated_command = _generate_command_from_function(_wrapped)
generated_command.command_type = command_type
generated_command.output_type = output_type
generated_command.hidden = hidden
generated_command.icon_name = icon_name
if description:
generated_command.description = description
resolved_mod = _resolve_display_modifiers(
_wrapped, generated_command.name, schema=schema, form=form, template=template
)
generated_command.schema = resolved_mod["schema"]
generated_command.form = resolved_mod["form"]
generated_command.template = resolved_mod["template"]
func_command = getattr(_wrapped, "_command", None)
if func_command:
_update_func_command(func_command, generated_command)
else:
_wrapped._command = generated_command
@wrapt.decorator(enabled=_wrap_functions)
def wrapper(_double_wrapped, _, _args, _kwargs):
return _double_wrapped(*_args, **_kwargs)
return wrapper(_wrapped)
[docs]def parameter(
_wrapped=None,
key=None,
type=None,
multi=None,
display_name=None,
optional=None,
default=None,
description=None,
choices=None,
nullable=None,
maximum=None,
minimum=None,
regex=None,
is_kwarg=None,
model=None,
form_input_type=None,
):
"""Decorator that enables Parameter specifications for a beer-garden Command
This is intended to be used when more specification is desired for a Parameter.
For example::
@parameter(
key="message",
description="Message to echo",
optional=True,
type="String",
default="Hello, World!",
)
def echo(self, message):
return message
Args:
_wrapped: The function to decorate. This is handled as a positional argument and
shouldn't be explicitly set.
key: String specifying the parameter identifier. Must match an argument name of
the decorated function.
type: String indicating the type to use for this parameter.
multi: Boolean indicating if this parameter is a multi. See documentation for
discussion of what this means.
display_name: String that will be displayed as a label in the user interface.
optional: Boolean indicating if this parameter must be specified.
default: The value this parameter will be assigned if not overridden when
creating a request.
description: An additional string that will be displayed in the user interface.
choices: List or dictionary specifying allowed values. See documentation for
more information.
nullable: Boolean indicating if this parameter is allowed to be null.
maximum: Integer indicating the maximum value of the parameter.
minimum: Integer indicating the minimum value of the parameter.
regex: String describing a regular expression constraint on the parameter.
is_kwarg: Boolean indicating if this parameter is meant to be part of the
decorated function's kwargs.
model: Class to be used as a model for this parameter. Must be a Python type
object, not an instance.
form_input_type: Only used for string fields. Changes the form input field
(e.g. textarea)
Returns:
The decorated function
"""
if _wrapped is None:
return functools.partial(
parameter,
key=key,
type=type,
multi=multi,
display_name=display_name,
optional=optional,
default=default,
description=description,
choices=choices,
nullable=nullable,
maximum=maximum,
minimum=minimum,
regex=regex,
is_kwarg=is_kwarg,
model=model,
form_input_type=form_input_type,
)
# Create a command object if one isn't already associated
cmd = getattr(_wrapped, "_command", None)
if not cmd:
cmd = _generate_command_from_function(_wrapped)
_wrapped._command = cmd
# Every parameter needs a key, so stop that right here
if key is None:
raise PluginParamError(
"Found a parameter definition without a key for command '%s'" % cmd.name
)
# If the command doesn't already have a parameter with this key then the
# method doesn't have an explicit keyword argument with <key> as the name.
# That's only OK if this parameter is meant to be part of the **kwargs.
param = cmd.get_parameter_by_key(key)
if param is None:
if is_kwarg:
param = Parameter(key=key, optional=False)
cmd.parameters.append(param)
else:
raise PluginParamError(
"Parameter '%s' was not an explicit keyword argument for "
"command '%s' and was not marked as part of kwargs (should "
"is_kwarg be True?)" % (key, cmd.name)
)
# Next, fail if the param is_kwarg=True and the method doesn't have a **kwargs
if is_kwarg:
kwarg_declared = False
for p in signature(_wrapped).parameters.values():
if p.kind == InspectParameter.VAR_KEYWORD:
kwarg_declared = True
break
if not kwarg_declared:
raise PluginParamError(
"Parameter '%s' of command '%s' was declared as a kwarg argument "
"(is_kwarg=True) but the command method does not declare a **kwargs "
"argument" % (key, cmd.name)
)
# Update parameter definition with the plugin_param arguments
param.type = _format_type(param.type if type is None else type)
param.multi = param.multi if multi is None else multi
param.display_name = param.display_name if display_name is None else display_name
param.optional = param.optional if optional is None else optional
param.default = param.default if default is None else default
param.description = param.description if description is None else description
param.choices = param.choices if choices is None else choices
param.nullable = param.nullable if nullable is None else nullable
param.maximum = param.maximum if maximum is None else maximum
param.minimum = param.minimum if minimum is None else minimum
param.regex = param.regex if regex is None else regex
param.form_input_type = (
param.form_input_type if form_input_type is None else form_input_type
)
param.choices = _format_choices(param.choices)
# Type info is where type specific information goes. For now, this is specific
# to file types. See #289 for more details.
param.type_info = {}
if param.type == "Bytes":
param.type_info = {"storage": "gridfs"}
# Model is another special case - it requires its own handling
if model is not None:
param.type = "Dictionary"
param.parameters = _generate_nested_params(model.parameters)
# If the model is not nullable and does not have a default we will try
# to generate a one using the defaults defined on the model parameters
if not param.nullable and not param.default:
param.default = {}
for nested_param in param.parameters:
if nested_param.default:
param.default[nested_param.key] = nested_param.default
if param.type == "Base64":
# Nullifying default file parameters for safety
param.default = None
@wrapt.decorator(enabled=_wrap_functions)
def wrapper(_double_wrapped, _, _args, _kwargs):
return _double_wrapped(*_args, **_kwargs)
return wrapper(_wrapped)
[docs]def parameters(*args):
"""Specify multiple Parameter definitions at once
This can be useful for commands which have a large number of complicated
parameters but aren't good candidates for a Model.
.. code-block:: python
@parameter(**params[cmd1][param1])
@parameter(**params[cmd1][param2])
@parameter(**params[cmd1][param3])
def cmd1(self, **kwargs):
pass
Can become:
.. code-block:: python
@parameters(params[cmd1])
def cmd1(self, **kwargs):
pass
Args:
*args (iterable): Positional arguments
The first (and only) positional argument must be a list containing
dictionaries that describe parameters.
Returns:
func: The decorated function
"""
if len(args) == 1:
return functools.partial(parameters, args[0])
elif len(args) != 2:
raise PluginParamError("@parameters takes a single argument")
if not isinstance(args[1], types.FunctionType):
raise PluginParamError("@parameters must be applied to a function")
try:
for param in args[0]:
parameter(args[1], **param)
except TypeError:
raise PluginParamError("@parameters arg must be an iterable of dictionaries")
@wrapt.decorator(enabled=_wrap_functions)
def wrapper(_double_wrapped, _, _args, _kwargs):
return _double_wrapped(*_args, **_kwargs)
return wrapper(args[1])
def _update_func_command(func_command, generated_command):
"""Update the current function's Command"""
func_command.name = generated_command.name
func_command.description = generated_command.description
func_command.command_type = generated_command.command_type
func_command.output_type = generated_command.output_type
func_command.hidden = generated_command.hidden
func_command.schema = generated_command.schema
func_command.form = generated_command.form
func_command.template = generated_command.template
func_command.icon_name = generated_command.icon_name
def _generate_command_from_function(func):
"""Generate a Command from a function
Will use the first line of the function's docstring as the description.
"""
# Required for Python 2/3 compatibility
if hasattr(func, "func_name"):
command_name = func.func_name
else:
command_name = func.__name__
# Required for Python 2/3 compatibility
if hasattr(func, "func_doc"):
docstring = func.func_doc
else:
docstring = func.__doc__
return Command(
name=command_name,
description=docstring.split("\n")[0] if docstring else None,
parameters=_generate_params_from_function(func),
)
def _generate_params_from_function(func):
"""Generate Parameters from function arguments.
Will set the Parameter key, default value, and optional value.
"""
parameters_to_return = []
code = six.get_function_code(func)
function_arguments = list(code.co_varnames or [])[: code.co_argcount]
function_defaults = list(six.get_function_defaults(func) or [])
while len(function_defaults) != len(function_arguments):
function_defaults.insert(0, None)
for index, param_name in enumerate(function_arguments):
# Skip Self or Class reference
if index == 0 and isinstance(func, types.FunctionType):
continue
default = function_defaults[index]
optional = False if default is None else True
parameters_to_return.append(
Parameter(key=param_name, default=default, optional=optional)
)
return parameters_to_return
def _generate_nested_params(parameter_list):
"""Generate nested parameters from a list of Parameters
This function will take a list of Parameters and will return a new list of "real"
Parameters.
The main thing this does is ensure the choices specification is correct for all
Parameters in the tree.
"""
parameters_to_return = []
for param in parameter_list:
# This is already a Parameter. Only really need to interpret the choices
# definition and recurse down into nested Parameters
if isinstance(param, Parameter):
new_param = Parameter(
key=param.key,
type=param.type,
multi=param.multi,
display_name=param.display_name,
optional=param.optional,
default=param.default,
description=param.description,
choices=_format_choices(param.choices),
nullable=param.nullable,
maximum=param.maximum,
minimum=param.minimum,
regex=param.regex,
type_info=param.type_info,
)
if param.parameters:
new_param.type = "Dictionary"
new_param.parameters = _generate_nested_params(param.parameters)
parameters_to_return.append(new_param)
# This is a model class object. Needed for backwards compatibility
# See https://github.com/beer-garden/beer-garden/issues/354
elif hasattr(param, "parameters"):
_deprecate(
"Constructing a nested Parameters list using model class objects "
"is deprecated. Please pass the model's parameter list directly."
)
parameters_to_return += _generate_nested_params(param.parameters)
# No clue!
else:
raise PluginParamError("Unable to generate parameter from '%s'" % param)
return parameters_to_return
def _resolve_display_modifiers(
wrapped, command_name, schema=None, form=None, template=None
):
def _load_from_url(url):
response = requests.get(url)
if response.headers.get("content-type", "").lower() == "application/json":
return json.loads(response.text)
return response.text
def _load_from_path(path):
current_dir = os.path.dirname(inspect.getfile(wrapped))
file_path = os.path.abspath(os.path.join(current_dir, path))
with open(file_path, "r") as definition_file:
return definition_file.read()
resolved = {}
for key, value in {"schema": schema, "form": form, "template": template}.items():
if isinstance(value, six.string_types):
try:
if value.startswith("http"):
resolved[key] = _load_from_url(value)
elif value.startswith("/") or value.startswith("."):
loaded_value = _load_from_path(value)
resolved[key] = (
loaded_value if key == "template" else json.loads(loaded_value)
)
elif key == "template":
resolved[key] = value
else:
raise PluginParamError(
"%s specified for command '%s' was not a "
"definition, file path, or URL" % (key, command_name)
)
except Exception as ex:
raise PluginParamError(
"Error reading %s definition from '%s' for command "
"'%s': %s" % (key, value, command_name, ex)
)
elif value is None or (key in ["schema", "form"] and isinstance(value, dict)):
resolved[key] = value
elif key == "form" and isinstance(value, list):
resolved[key] = {"type": "fieldset", "items": value}
else:
raise PluginParamError(
"%s specified for command '%s' was not a definition, "
"file path, or URL" % (key, command_name)
)
return resolved
def _format_type(param_type):
if param_type == str:
return "String"
elif param_type == int:
return "Integer"
elif param_type == float:
return "Float"
elif param_type == bool:
return "Boolean"
elif param_type == dict:
return "Dictionary"
elif str(param_type).lower() == "file":
return "Bytes"
elif str(param_type).lower() == "datetime":
return "DateTime"
elif not param_type:
return "Any"
else:
return str(param_type).title()
def _format_choices(choices):
def determine_display(display_value):
if isinstance(display_value, six.string_types):
return "typeahead"
return "select" if len(display_value) <= 50 else "typeahead"
def determine_type(type_value):
if isinstance(type_value, six.string_types):
return "url" if type_value.startswith("http") else "command"
return "static"
if not choices:
return None
if isinstance(choices, dict):
if not choices.get("value"):
raise PluginParamError(
"No 'value' provided for choices. You must at least "
"provide valid values."
)
value = choices.get("value")
display = choices.get("display", determine_display(value))
choice_type = choices.get("type")
strict = choices.get("strict", True)
if choice_type is None:
choice_type = determine_type(value)
elif choice_type not in Choices.TYPES:
raise PluginParamError(
"Invalid choices type '%s' - Valid type options are %s"
% (choice_type, Choices.TYPES)
)
else:
if (
(
choice_type == "command"
and not isinstance(value, (six.string_types, dict))
)
or (choice_type == "url" and not isinstance(value, six.string_types))
or (choice_type == "static" and not isinstance(value, (list, dict)))
):
allowed_types = {
"command": "('string', 'dictionary')",
"url": "('string')",
"static": "('list', 'dictionary)",
}
raise PluginParamError(
"Invalid choices value type '%s' - Valid value types for "
"choice type '%s' are %s"
% (type(value), choice_type, allowed_types[choice_type])
)
if display not in Choices.DISPLAYS:
raise PluginParamError(
"Invalid choices display '%s' - Valid display options are %s"
% (display, Choices.DISPLAYS)
)
elif isinstance(choices, str):
value = choices
display = determine_display(value)
choice_type = determine_type(value)
strict = True
else:
try:
# Assume some sort of iterable
value = list(choices)
except TypeError:
raise PluginParamError(
"Invalid 'choices': must be a string, dictionary, or iterable."
)
display = determine_display(value)
choice_type = determine_type(value)
strict = True
# Now parse out type-specific aspects
unparsed_value = ""
try:
if choice_type == "command":
if isinstance(value, six.string_types):
unparsed_value = value
else:
unparsed_value = value["command"]
details = parse(unparsed_value, parse_as="func")
elif choice_type == "url":
unparsed_value = value
details = parse(unparsed_value, parse_as="url")
else:
if isinstance(value, dict):
unparsed_value = choices.get("key_reference")
if unparsed_value is None:
raise PluginParamError(
"Specifying a static choices dictionary requires a "
'"key_reference" field with a reference to another '
'parameter ("key_reference": "${param_key}")'
)
details = {"key_reference": parse(unparsed_value, parse_as="reference")}
else:
details = {}
except ParseError:
raise PluginParamError(
"Invalid choices definition - Unable to parse '%s'" % unparsed_value
)
return Choices(
type=choice_type, display=display, value=value, strict=strict, details=details
)
# Alias the old names for compatibility
def command_registrar(*args, **kwargs):
_deprecate(
"Looks like you're using the '@command_registrar' decorator. Heads up - this "
"name will be removed in version 4.0, please use '@system' instead. Thanks!"
)
return system(*args, **kwargs)
def register(*args, **kwargs):
_deprecate(
"Looks like you're using the '@register' decorator. Heads up - this name will "
"be removed in version 4.0, please use '@command' instead. Thanks!"
)
return command(*args, **kwargs)
def plugin_param(*args, **kwargs):
_deprecate(
"Looks like you're using the '@plugin_param' decorator. Heads up - this name "
"will be removed in version 4.0, please use '@parameter' instead. Thanks!"
)
return parameter(*args, **kwargs)