Source code for brewtils.schemas

import calendar
import datetime

from marshmallow import Schema, post_dump, post_load, pre_load, fields
from marshmallow.utils import UTC


[docs]class DateTime(fields.DateTime): """Class that adds methods for (de)serializing DateTime fields as an epoch""" def __init__(self, format='epoch', **kwargs): self.DATEFORMAT_SERIALIZATION_FUNCS['epoch'] = self.to_epoch self.DATEFORMAT_DESERIALIZATION_FUNCS['epoch'] = self.from_epoch super(DateTime, self).__init__(format=format, **kwargs)
[docs] @staticmethod def to_epoch(dt, localtime=False): if localtime and dt.tzinfo is not None: localized = dt else: if dt.tzinfo is None: localized = UTC.localize(dt) else: localized = dt.astimezone(UTC) return (calendar.timegm(localized.timetuple()) * 1000) + int(localized.microsecond / 1000)
[docs] @staticmethod def from_epoch(epoch): # utcfromtimestamp will correctly parse milliseconds in Python 3, # but in Python 2 we need to help it seconds, millis = divmod(epoch, 1000) return datetime.datetime.utcfromtimestamp(seconds).replace(microsecond=millis*1000)
[docs]class BaseSchema(Schema): def __init__(self, strict=True, **kwargs): super(BaseSchema, self).__init__(strict=strict, **kwargs)
[docs] @post_load def make_object(self, data): try: model_class = self.context['models'][self.__class__.__name__] except KeyError: return data return model_class(**data)
[docs] @classmethod def get_attribute_names(cls): return [key for key, value in cls._declared_fields.items() if isinstance(value, fields.FieldABC)]
[docs]class ChoicesSchema(BaseSchema): type = fields.Str() display = fields.Str() value = fields.Raw(many=True) strict = fields.Bool(default=False) details = fields.Dict()
[docs]class ParameterSchema(BaseSchema): key = fields.Str() type = fields.Str(allow_none=True) multi = fields.Bool(allow_none=True) display_name = fields.Str(allow_none=True) optional = fields.Bool(allow_none=True) default = fields.Raw(allow_none=True) description = fields.Str(allow_none=True) choices = fields.Nested('ChoicesSchema', allow_none=True, many=False) parameters = fields.Nested('self', many=True, allow_none=True) nullable = fields.Bool(allow_none=True) maximum = fields.Int(allow_none=True) minimum = fields.Int(allow_none=True) regex = fields.Str(allow_none=True) form_input_type = fields.Str(allow_none=True)
[docs]class CommandSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str() description = fields.Str(allow_none=True) parameters = fields.Nested('ParameterSchema', many=True) command_type = fields.Str(allow_none=True) output_type = fields.Str(allow_none=True) schema = fields.Dict(allow_none=True) form = fields.Dict(allow_none=True) template = fields.Str(allow_none=True) icon_name = fields.Str(allow_none=True) system = fields.Nested('SystemSchema', only=('id', ), allow_none=True)
[docs]class InstanceSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str() description = fields.Str(allow_none=True) status = fields.Str(allow_none=True) status_info = fields.Nested('StatusInfoSchema', allow_none=True) queue_type = fields.Str(allow_none=True) queue_info = fields.Dict(allow_none=True) icon_name = fields.Str(allow_none=True) metadata = fields.Dict(allow_none=True)
[docs]class SystemSchema(BaseSchema): id = fields.Str(allow_none=True) name = fields.Str() description = fields.Str(allow_none=True) version = fields.Str() max_instances = fields.Integer(allow_none=True) icon_name = fields.Str(allow_none=True) instances = fields.Nested('InstanceSchema', many=True, allow_none=True) commands = fields.Nested('CommandSchema', many=True) display_name = fields.Str(allow_none=True) metadata = fields.Dict(allow_none=True)
[docs]class RequestSchema(BaseSchema): id = fields.Str(allow_none=True) system = fields.Str(allow_none=True) system_version = fields.Str(allow_none=True) instance_name = fields.Str(allow_none=True) command = fields.Str(allow_none=True) parent = fields.Nested('self', exclude=('children', ), allow_none=True) children = fields.Nested('self', exclude=('parent', 'children'), many=True, default=None, allow_none=True) parameters = fields.Dict(allow_none=True) comment = fields.Str(allow_none=True) output = fields.Str(allow_none=True) output_type = fields.Str(allow_none=True) status = fields.Str(allow_none=True) command_type = fields.Str(allow_none=True) error_class = fields.Str(allow_none=True) created_at = DateTime(allow_none=True, format='epoch', example='1500065932000') updated_at = DateTime(allow_none=True, format='epoch', example='1500065932000') metadata = fields.Dict(allow_none=True)
[docs]class StatusInfoSchema(BaseSchema): heartbeat = DateTime(allow_none=True, format='epoch', example='1500065932000')
[docs]class PatchSchema(BaseSchema): operation = fields.Str() path = fields.Str(allow_none=True) value = fields.Raw(allow_none=True)
[docs] @pre_load(pass_many=True) def unwrap_envelope(self, data, many): if isinstance(data, list): return data elif 'operations' in data: return data['operations'] else: return [data]
[docs] @post_dump(pass_many=True) def wrap_envelope(self, data, many): return {u'operations': data if many else [data]}
[docs]class LoggingConfigSchema(BaseSchema): level = fields.Str(allow_none=True) formatters = fields.Dict(allow_none=True) handlers = fields.Dict(allow_none=True)
[docs]class EventSchema(BaseSchema): name = fields.Str(allow_none=True) payload = fields.Dict(allow_none=True) error = fields.Bool(allow_none=True) metadata = fields.Dict(allow_none=True) timestamp = DateTime(allow_none=True, format='epoch', example='1500065932000')
[docs]class QueueSchema(BaseSchema): name = fields.Str(allow_none=True) system = fields.Str(allow_none=True) version = fields.Str(allow_none=True) instance = fields.Str(allow_none=True) system_id = fields.Str(allow_none=True) display = fields.Str(allow_none=True) size = fields.Integer(allow_none=True)