# -*- coding: utf-8 -*-
import calendar
import datetime
import simplejson
import marshmallow
from marshmallow import Schema, post_dump, post_load, pre_load, fields
from marshmallow.utils import UTC
from marshmallow_polyfield import PolyField
__all__ = [
"SystemSchema",
"InstanceSchema",
"CommandSchema",
"ParameterSchema",
"RequestSchema",
"PatchSchema",
"LoggingConfigSchema",
"EventSchema",
"QueueSchema",
"PrincipalSchema",
"RoleSchema",
"RefreshTokenSchema",
"JobSchema",
"DateTriggerSchema",
"IntervalTriggerSchema",
"CronTriggerSchema",
]
class DateTime(fields.DateTime):
"""Class that adds methods for (de)serializing DateTime fields as an epoch"""
def __init__(self, format="epoch", **kwargs):
self.DATEFORMAT_SERIALIZATION_FUNCS["epoch"] = self.to_epoch
self.DATEFORMAT_DESERIALIZATION_FUNCS["epoch"] = self.from_epoch
super(DateTime, self).__init__(format=format, **kwargs)
@staticmethod
def to_epoch(dt, localtime=False):
if localtime and dt.tzinfo is not None:
localized = dt
else:
if dt.tzinfo is None:
localized = UTC.localize(dt)
else:
localized = dt.astimezone(UTC)
return (calendar.timegm(localized.timetuple()) * 1000) + int(
localized.microsecond / 1000
)
@staticmethod
def from_epoch(epoch):
# utcfromtimestamp will correctly parse milliseconds in Python 3,
# but in Python 2 we need to help it
seconds, millis = divmod(epoch, 1000)
return datetime.datetime.utcfromtimestamp(seconds).replace(
microsecond=millis * 1000
)
class BaseSchema(Schema):
class Meta:
version_nums = marshmallow.__version__.split(".")
if int(version_nums[0]) <= 2 and int(version_nums[1]) < 17:
json_module = simplejson
else:
render_module = simplejson
def __init__(self, strict=True, **kwargs):
super(BaseSchema, self).__init__(strict=strict, **kwargs)
@post_load
def make_object(self, data):
try:
model_class = self.context["models"][self.__class__.__name__]
except KeyError:
return data
return model_class(**data)
@classmethod
def get_attribute_names(cls):
return [
key
for key, value in cls._declared_fields.items()
if isinstance(value, fields.FieldABC)
]
class ChoicesSchema(BaseSchema):
type = fields.Str()
display = fields.Str()
value = fields.Raw(many=True)
strict = fields.Bool(default=False)
details = fields.Dict()
[docs]class ParameterSchema(BaseSchema):
key = fields.Str()
type = fields.Str(allow_none=True)
multi = fields.Bool(allow_none=True)
display_name = fields.Str(allow_none=True)
optional = fields.Bool(allow_none=True)
default = fields.Raw(allow_none=True)
description = fields.Str(allow_none=True)
choices = fields.Nested("ChoicesSchema", allow_none=True, many=False)
parameters = fields.Nested("self", many=True, allow_none=True)
nullable = fields.Bool(allow_none=True)
maximum = fields.Int(allow_none=True)
minimum = fields.Int(allow_none=True)
regex = fields.Str(allow_none=True)
form_input_type = fields.Str(allow_none=True)
[docs]class CommandSchema(BaseSchema):
id = fields.Str(allow_none=True)
name = fields.Str()
description = fields.Str(allow_none=True)
parameters = fields.Nested("ParameterSchema", many=True)
command_type = fields.Str(allow_none=True)
output_type = fields.Str(allow_none=True)
schema = fields.Dict(allow_none=True)
form = fields.Dict(allow_none=True)
template = fields.Str(allow_none=True)
icon_name = fields.Str(allow_none=True)
system = fields.Nested("SystemSchema", only=("id",), allow_none=True)
[docs]class InstanceSchema(BaseSchema):
id = fields.Str(allow_none=True)
name = fields.Str()
description = fields.Str(allow_none=True)
status = fields.Str(allow_none=True)
status_info = fields.Nested("StatusInfoSchema", allow_none=True)
queue_type = fields.Str(allow_none=True)
queue_info = fields.Dict(allow_none=True)
icon_name = fields.Str(allow_none=True)
metadata = fields.Dict(allow_none=True)
[docs]class SystemSchema(BaseSchema):
id = fields.Str(allow_none=True)
name = fields.Str()
description = fields.Str(allow_none=True)
version = fields.Str()
max_instances = fields.Integer(allow_none=True)
icon_name = fields.Str(allow_none=True)
instances = fields.Nested("InstanceSchema", many=True, allow_none=True)
commands = fields.Nested("CommandSchema", many=True)
display_name = fields.Str(allow_none=True)
metadata = fields.Dict(allow_none=True)
class RequestTemplateSchema(BaseSchema):
"""Used as a base class for request and a request template for jobs."""
system = fields.Str(allow_none=True)
system_version = fields.Str(allow_none=True)
instance_name = fields.Str(allow_none=True)
command = fields.Str(allow_none=True)
parameters = fields.Dict(allow_none=True)
comment = fields.Str(allow_none=True)
metadata = fields.Dict(allow_none=True)
[docs]class RequestSchema(RequestTemplateSchema):
id = fields.Str(allow_none=True)
parent = fields.Nested("self", exclude=("children",), allow_none=True)
children = fields.Nested(
"self", exclude=("parent", "children"), many=True, default=None, allow_none=True
)
output = fields.Str(allow_none=True)
output_type = fields.Str(allow_none=True)
status = fields.Str(allow_none=True)
command_type = fields.Str(allow_none=True)
error_class = fields.Str(allow_none=True)
created_at = DateTime(allow_none=True, format="epoch", example="1500065932000")
updated_at = DateTime(allow_none=True, format="epoch", example="1500065932000")
has_parent = fields.Bool(allow_none=True)
requester = fields.String(allow_none=True)
class StatusInfoSchema(BaseSchema):
heartbeat = DateTime(allow_none=True, format="epoch", example="1500065932000")
[docs]class PatchSchema(BaseSchema):
operation = fields.Str()
path = fields.Str(allow_none=True)
value = fields.Raw(allow_none=True)
[docs] @pre_load(pass_many=True)
def unwrap_envelope(self, data, many):
if isinstance(data, list):
return data
elif "operations" in data:
return data["operations"]
else:
return [data]
[docs] @post_dump(pass_many=True)
def wrap_envelope(self, data, many):
return {u"operations": data if many else [data]}
[docs]class LoggingConfigSchema(BaseSchema):
level = fields.Str(allow_none=True)
formatters = fields.Dict(allow_none=True)
handlers = fields.Dict(allow_none=True)
[docs]class EventSchema(BaseSchema):
name = fields.Str(allow_none=True)
payload = fields.Dict(allow_none=True)
error = fields.Bool(allow_none=True)
metadata = fields.Dict(allow_none=True)
timestamp = DateTime(allow_none=True, format="epoch", example="1500065932000")
[docs]class QueueSchema(BaseSchema):
name = fields.Str(allow_none=True)
system = fields.Str(allow_none=True)
version = fields.Str(allow_none=True)
instance = fields.Str(allow_none=True)
system_id = fields.Str(allow_none=True)
display = fields.Str(allow_none=True)
size = fields.Integer(allow_none=True)
[docs]class PrincipalSchema(BaseSchema):
id = fields.Str(allow_none=True)
username = fields.Str(allow_none=True)
roles = fields.Nested("RoleSchema", many=True, allow_none=True)
permissions = fields.List(fields.Str(), allow_none=True)
preferences = fields.Dict(allow_none=True)
metadata = fields.Dict(allow_none=True)
[docs]class RoleSchema(BaseSchema):
id = fields.Str(allow_none=True)
name = fields.Str(allow_none=True)
description = fields.Str(allow_none=True)
roles = fields.Nested("self", many=True, allow_none=True)
permissions = fields.List(fields.Str(), allow_none=True)
[docs]class RefreshTokenSchema(BaseSchema):
id = fields.Str(allow_none=True)
issued = DateTime(allow_none=True, format="epoch", example="1500065932000")
expires = DateTime(allow_none=True, format="epoch", example="1500065932000")
payload = fields.Dict()
[docs]class DateTriggerSchema(BaseSchema):
run_date = DateTime(allow_none=True, format="epoch", example="1500065932000")
timezone = fields.Str(allow_none=True)
[docs]class IntervalTriggerSchema(BaseSchema):
weeks = fields.Int(allow_none=True)
days = fields.Int(allow_none=True)
hours = fields.Int(allow_none=True)
minutes = fields.Int(allow_none=True)
seconds = fields.Int(allow_none=True)
start_date = DateTime(allow_none=True, format="epoch", example="1500065932000")
end_date = DateTime(allow_none=True, format="epoch", example="1500065932000")
timezone = fields.Str(allow_none=True)
jitter = fields.Int(allow_none=True)
reschedule_on_finish = fields.Bool(allow_none=True)
[docs]class CronTriggerSchema(BaseSchema):
year = fields.Str(allow_none=True)
month = fields.Str(allow_none=True)
day = fields.Str(allow_none=True)
week = fields.Str(allow_none=True)
day_of_week = fields.Str(allow_none=True)
hour = fields.Str(allow_none=True)
minute = fields.Str(allow_none=True)
second = fields.Str(allow_none=True)
start_date = DateTime(allow_none=True, format="epoch", example="1500065932000")
end_date = DateTime(allow_none=True, format="epoch", example="1500065932000")
timezone = fields.Str(allow_none=True)
jitter = fields.Int(allow_none=True)
TRIGGER_TYPE_TO_SCHEMA = {
"interval": IntervalTriggerSchema,
"date": DateTriggerSchema,
"cron": CronTriggerSchema,
}
def serialize_trigger_selector(_, obj):
try:
return TRIGGER_TYPE_TO_SCHEMA[obj.trigger_type]()
except KeyError:
pass
raise TypeError("Could not detect %s trigger type schema" % obj.trigger_type)
def deserialize_trigger_selector(_, data):
try:
return TRIGGER_TYPE_TO_SCHEMA[data["trigger_type"]]()
except KeyError:
pass
raise TypeError("Could not detect %s trigger type schema" % data["trigger_type"])
[docs]class JobSchema(BaseSchema):
id = fields.Str(allow_none=True)
name = fields.Str(allow_none=True)
trigger_type = fields.Str(allow_none=True)
trigger = PolyField(
allow_none=True,
serialization_schema_selector=serialize_trigger_selector,
deserialization_schema_selector=deserialize_trigger_selector,
)
request_template = fields.Nested("RequestTemplateSchema")
misfire_grace_time = fields.Int(allow_none=True)
coalesce = fields.Bool(allow_none=True)
next_run_time = DateTime(allow_none=True, format="epoch", example="1500065932000")
success_count = fields.Int(allow_none=True)
error_count = fields.Int(allow_none=True)
status = fields.Str(allow_none=True)
max_instances = fields.Int(allow_none=True)