Source code for brewtils.schemas

# -*- coding: utf-8 -*-

import calendar
import datetime

import simplejson
import marshmallow
from marshmallow import Schema, post_dump, post_load, pre_load, fields
from marshmallow.utils import UTC
from marshmallow_polyfield import PolyField

__all__ = [
    "SystemSchema",
    "InstanceSchema",
    "CommandSchema",
    "ParameterSchema",
    "RequestSchema",
    "PatchSchema",
    "LoggingConfigSchema",
    "EventSchema",
    "QueueSchema",
    "PrincipalSchema",
    "RoleSchema",
    "RefreshTokenSchema",
    "JobSchema",
    "DateTriggerSchema",
    "IntervalTriggerSchema",
    "CronTriggerSchema",
]


class DateTime(fields.DateTime):
    """Class that adds methods for (de)serializing DateTime fields as an epoch"""

    def __init__(self, format="epoch", **kwargs):
        self.DATEFORMAT_SERIALIZATION_FUNCS["epoch"] = self.to_epoch
        self.DATEFORMAT_DESERIALIZATION_FUNCS["epoch"] = self.from_epoch
        super(DateTime, self).__init__(format=format, **kwargs)

    @staticmethod
    def to_epoch(dt, localtime=False):
        if localtime and dt.tzinfo is not None:
            localized = dt
        else:
            if dt.tzinfo is None:
                localized = UTC.localize(dt)
            else:
                localized = dt.astimezone(UTC)
        return (calendar.timegm(localized.timetuple()) * 1000) + int(
            localized.microsecond / 1000
        )

    @staticmethod
    def from_epoch(epoch):
        # utcfromtimestamp will correctly parse milliseconds in Python 3,
        # but in Python 2 we need to help it
        seconds, millis = divmod(epoch, 1000)
        return datetime.datetime.utcfromtimestamp(seconds).replace(
            microsecond=millis * 1000
        )


class BaseSchema(Schema):
    class Meta:
        version_nums = marshmallow.__version__.split(".")
        if int(version_nums[0]) <= 2 and int(version_nums[1]) < 17:
            json_module = simplejson
        else:
            render_module = simplejson

    def __init__(self, strict=True, **kwargs):
        super(BaseSchema, self).__init__(strict=strict, **kwargs)

    @post_load
    def make_object(self, data):
        try:
            model_class = self.context["models"][self.__class__.__name__]
        except KeyError:
            return data

        return model_class(**data)

    @classmethod
    def get_attribute_names(cls):
        return [
            key
            for key, value in cls._declared_fields.items()
            if isinstance(value, fields.FieldABC)
        ]


class ChoicesSchema(BaseSchema):

    type = fields.Str()
    display = fields.Str()
    value = fields.Raw(many=True)
    strict = fields.Bool(default=False)
    details = fields.Dict()


[docs]class ParameterSchema(BaseSchema): key = fields.Str() type = fields.Str(allow_none=True) multi = fields.Bool(allow_none=True) display_name = fields.Str(allow_none=True) optional = fields.Bool(allow_none=True) default = fields.Raw(allow_none=True) description = fields.Str(allow_none=True) choices = fields.Nested("ChoicesSchema", allow_none=True, many=False) parameters = fields.Nested("self", many=True, allow_none=True) nullable = fields.Bool(allow_none=True) maximum = fields.Int(allow_none=True) minimum = fields.Int(allow_none=True) regex = fields.Str(allow_none=True) form_input_type = fields.Str(allow_none=True)
[docs]class CommandSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str() description = fields.Str(allow_none=True) parameters = fields.Nested("ParameterSchema", many=True) command_type = fields.Str(allow_none=True) output_type = fields.Str(allow_none=True) schema = fields.Dict(allow_none=True) form = fields.Dict(allow_none=True) template = fields.Str(allow_none=True) icon_name = fields.Str(allow_none=True) system = fields.Nested("SystemSchema", only=("id",), allow_none=True)
[docs]class InstanceSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str() description = fields.Str(allow_none=True) status = fields.Str(allow_none=True) status_info = fields.Nested("StatusInfoSchema", allow_none=True) queue_type = fields.Str(allow_none=True) queue_info = fields.Dict(allow_none=True) icon_name = fields.Str(allow_none=True) metadata = fields.Dict(allow_none=True)
[docs]class SystemSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str() description = fields.Str(allow_none=True) version = fields.Str() max_instances = fields.Integer(allow_none=True) icon_name = fields.Str(allow_none=True) instances = fields.Nested("InstanceSchema", many=True, allow_none=True) commands = fields.Nested("CommandSchema", many=True) display_name = fields.Str(allow_none=True) metadata = fields.Dict(allow_none=True)
class RequestTemplateSchema(BaseSchema): """Used as a base class for request and a request template for jobs.""" system = fields.Str(allow_none=True) system_version = fields.Str(allow_none=True) instance_name = fields.Str(allow_none=True) command = fields.Str(allow_none=True) parameters = fields.Dict(allow_none=True) comment = fields.Str(allow_none=True) metadata = fields.Dict(allow_none=True)
[docs]class RequestSchema(RequestTemplateSchema): id = fields.Str(allow_none=True) parent = fields.Nested("self", exclude=("children",), allow_none=True) children = fields.Nested( "self", exclude=("parent", "children"), many=True, default=None, allow_none=True ) output = fields.Str(allow_none=True) output_type = fields.Str(allow_none=True) status = fields.Str(allow_none=True) command_type = fields.Str(allow_none=True) error_class = fields.Str(allow_none=True) created_at = DateTime(allow_none=True, format="epoch", example="1500065932000") updated_at = DateTime(allow_none=True, format="epoch", example="1500065932000") has_parent = fields.Bool(allow_none=True) requester = fields.String(allow_none=True)
class StatusInfoSchema(BaseSchema): heartbeat = DateTime(allow_none=True, format="epoch", example="1500065932000")
[docs]class PatchSchema(BaseSchema): operation = fields.Str() path = fields.Str(allow_none=True) value = fields.Raw(allow_none=True)
[docs] @pre_load(pass_many=True) def unwrap_envelope(self, data, many): if isinstance(data, list): return data elif "operations" in data: return data["operations"] else: return [data]
[docs] @post_dump(pass_many=True) def wrap_envelope(self, data, many): return {u"operations": data if many else [data]}
[docs]class LoggingConfigSchema(BaseSchema): level = fields.Str(allow_none=True) formatters = fields.Dict(allow_none=True) handlers = fields.Dict(allow_none=True)
[docs]class EventSchema(BaseSchema): name = fields.Str(allow_none=True) payload = fields.Dict(allow_none=True) error = fields.Bool(allow_none=True) metadata = fields.Dict(allow_none=True) timestamp = DateTime(allow_none=True, format="epoch", example="1500065932000")
[docs]class QueueSchema(BaseSchema): name = fields.Str(allow_none=True) system = fields.Str(allow_none=True) version = fields.Str(allow_none=True) instance = fields.Str(allow_none=True) system_id = fields.Str(allow_none=True) display = fields.Str(allow_none=True) size = fields.Integer(allow_none=True)
[docs]class PrincipalSchema(BaseSchema): id = fields.Str(allow_none=True) username = fields.Str(allow_none=True) roles = fields.Nested("RoleSchema", many=True, allow_none=True) permissions = fields.List(fields.Str(), allow_none=True) preferences = fields.Dict(allow_none=True) metadata = fields.Dict(allow_none=True)
[docs]class RoleSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str(allow_none=True) description = fields.Str(allow_none=True) roles = fields.Nested("self", many=True, allow_none=True) permissions = fields.List(fields.Str(), allow_none=True)
[docs]class RefreshTokenSchema(BaseSchema): id = fields.Str(allow_none=True) issued = DateTime(allow_none=True, format="epoch", example="1500065932000") expires = DateTime(allow_none=True, format="epoch", example="1500065932000") payload = fields.Dict()
[docs]class DateTriggerSchema(BaseSchema): run_date = DateTime(allow_none=True, format="epoch", example="1500065932000") timezone = fields.Str(allow_none=True)
[docs]class IntervalTriggerSchema(BaseSchema): weeks = fields.Int(allow_none=True) days = fields.Int(allow_none=True) hours = fields.Int(allow_none=True) minutes = fields.Int(allow_none=True) seconds = fields.Int(allow_none=True) start_date = DateTime(allow_none=True, format="epoch", example="1500065932000") end_date = DateTime(allow_none=True, format="epoch", example="1500065932000") timezone = fields.Str(allow_none=True) jitter = fields.Int(allow_none=True) reschedule_on_finish = fields.Bool(allow_none=True)
[docs]class CronTriggerSchema(BaseSchema): year = fields.Str(allow_none=True) month = fields.Str(allow_none=True) day = fields.Str(allow_none=True) week = fields.Str(allow_none=True) day_of_week = fields.Str(allow_none=True) hour = fields.Str(allow_none=True) minute = fields.Str(allow_none=True) second = fields.Str(allow_none=True) start_date = DateTime(allow_none=True, format="epoch", example="1500065932000") end_date = DateTime(allow_none=True, format="epoch", example="1500065932000") timezone = fields.Str(allow_none=True) jitter = fields.Int(allow_none=True)
TRIGGER_TYPE_TO_SCHEMA = { "interval": IntervalTriggerSchema, "date": DateTriggerSchema, "cron": CronTriggerSchema, } def serialize_trigger_selector(_, obj): try: return TRIGGER_TYPE_TO_SCHEMA[obj.trigger_type]() except KeyError: pass raise TypeError("Could not detect %s trigger type schema" % obj.trigger_type) def deserialize_trigger_selector(_, data): try: return TRIGGER_TYPE_TO_SCHEMA[data["trigger_type"]]() except KeyError: pass raise TypeError("Could not detect %s trigger type schema" % data["trigger_type"])
[docs]class JobSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str(allow_none=True) trigger_type = fields.Str(allow_none=True) trigger = PolyField( allow_none=True, serialization_schema_selector=serialize_trigger_selector, deserialization_schema_selector=deserialize_trigger_selector, ) request_template = fields.Nested("RequestTemplateSchema") misfire_grace_time = fields.Int(allow_none=True) coalesce = fields.Bool(allow_none=True) next_run_time = DateTime(allow_none=True, format="epoch", example="1500065932000") success_count = fields.Int(allow_none=True) error_count = fields.Int(allow_none=True) status = fields.Str(allow_none=True) max_instances = fields.Int(allow_none=True)