Module lib.model.change_request
Change Request Data Model
Expand source code
"""
Change Request Data Model
"""
import datetime
from sqlalchemy import MetaData, Table, Column, String, \
DateTime, Text, BigInteger, JSON, Integer
from lib.model import HasRelationships, register_database_model, \
find_model_by_name, DynamicViewModel
from lib.model.employee import Employee
from lib.repository.db import DatabaseRepository
@register_database_model # pylint: disable=too-many-ancestors
class ChangeRequest(DatabaseRepository, DynamicViewModel, HasRelationships):
"""
Tied directly to the 'change_request' table
"""
resource_uri = 'change_request'
field_validators = {
'id': 'notnull',
# 'phone_number': 'phone',
# 'emergency_contact_phone': 'phone',
}
field_optional_validators = {
}
field_casts = {
}
view_columns = {
# 'id': 'ID',
'author': 'Issued By',
'table_name': 'Data Type',
'row_id': 'ID affected',
'changes': 'Changes',
# 'reason': 'Reason',
'created_at': 'Created On',
}
def __init__(self, data):
DynamicViewModel.__init__(self, data)
DatabaseRepository.__init__(self)
self.load_relationships()
def to_dict(self):
return self.trim_relationships(DynamicViewModel.to_dict(self))
def load_relationships(self):
self.author = Employee.read(self.author_user_id)
try:
self.approved_by = Employee.read(self.approved_by_user_id)
except AttributeError:
self.approved_by = None
def relationship_fields(self):
return [
'author',
'approved_by'
]
def apply_to_db(self, approved_by: Employee):
"""
Only works for DatabaseRepository
:param approved_by:
:return:
"""
model_cls = find_model_by_name(self.table_name)
if not model_cls:
raise Exception("Table name given in the ChangeRequest does not exist!")
changes = ChangeRequest.deserialize_dates(self.changes)
if self.row_id: # is just a change?
model = model_cls.read(self.row_id)
for diff_type, field, values in changes:
if diff_type == 'change':
setattr(model, field, values[1])
model_cls.update(model)
else: # it must be an entire new object
model_raw = {}
for diff_type, field, values in changes:
if diff_type == 'add':
for couple in values:
model_raw[couple[0]] = couple[1]
model_cls.create(model_cls(model_raw))
self.approved_by_user_id = approved_by.id # pylint: disable=attribute-defined-outside-init
self.approved_at = datetime.datetime.now() # pylint: disable=attribute-defined-outside-init
self.changes = ChangeRequest.serialize_dates(self.changes)
ChangeRequest.update(self)
@classmethod
def new_empty(cls):
raise NotImplementedError
@classmethod
def prettify_changes(cls, changes, model_name=None) -> str:
"""
A utility for making changes prettier on the view
:param changes: dictdiffer result
:param model_name: model str
:return: prettified changes
"""
result = ""
for diff_type, field, values in changes: # pylint: disable=unused-variable
if diff_type == 'change':
if model_name:
model_cls = find_model_by_name(model_name)
result += f"{model_cls.view_columns[field]}: {values[0]} => {values[1]}\n"
else:
result += f"{field}: {values[0]} > {values[1]}\n"
else:
for couple in values:
if model_name:
model_cls = find_model_by_name(model_name)
if couple[0] in model_cls.view_columns:
result += f"{model_cls.view_columns[couple[0]]}: {couple[1]}\n"
# else:
# result += f"{model_cls.view_columns[couple[0]]}: {couple[1]}\n"
else:
result += f"{couple[0]}: {couple[1]}\n"
return result.rstrip()
@classmethod
def table(cls, metadata=MetaData()) -> Table:
return Table(cls.resource_uri, metadata,
Column('id', BigInteger().with_variant(Integer, "sqlite"), primary_key=True),
Column('author_user_id', BigInteger),
Column('table_name', String(64)),
Column('row_id', BigInteger, nullable=True),
Column('changes', JSON),
Column('reason', Text, nullable=True),
Column('approved_at', DateTime, nullable=True),
Column('created_at', DateTime),
Column('modified_at', DateTime),
Column('approved_by_user_id', BigInteger, nullable=True),
extend_existing=True
)
@staticmethod
def serialize_dates(changes):
for action, field, values in changes:
if action == 'add':
for i in range(len(values)):
couple = values[i]
mutable = list(couple)
if isinstance(couple[1], datetime.datetime):
mutable[1] = couple[1].isoformat()
values[i] = tuple(mutable)
return changes
@staticmethod
def deserialize_dates(changes):
for action, field, values in changes:
if action == 'add':
for i in range(len(values)):
couple = values[i]
mutable = list(couple)
try:
mutable[1] = datetime.datetime.strptime(couple[1], "%Y-%m-%dT%H:%M:%S")
values[i] = tuple(mutable)
except:
try:
mutable[1] = datetime.datetime.strptime(couple[1], "%Y-%m-%dT%H:%M:%S.%f")
values[i] = tuple(mutable)
except:
continue
return changes
Classes
class ChangeRequest (data)
-
Tied directly to the 'change_request' table
Expand source code
class ChangeRequest(DatabaseRepository, DynamicViewModel, HasRelationships): """ Tied directly to the 'change_request' table """ resource_uri = 'change_request' field_validators = { 'id': 'notnull', # 'phone_number': 'phone', # 'emergency_contact_phone': 'phone', } field_optional_validators = { } field_casts = { } view_columns = { # 'id': 'ID', 'author': 'Issued By', 'table_name': 'Data Type', 'row_id': 'ID affected', 'changes': 'Changes', # 'reason': 'Reason', 'created_at': 'Created On', } def __init__(self, data): DynamicViewModel.__init__(self, data) DatabaseRepository.__init__(self) self.load_relationships() def to_dict(self): return self.trim_relationships(DynamicViewModel.to_dict(self)) def load_relationships(self): self.author = Employee.read(self.author_user_id) try: self.approved_by = Employee.read(self.approved_by_user_id) except AttributeError: self.approved_by = None def relationship_fields(self): return [ 'author', 'approved_by' ] def apply_to_db(self, approved_by: Employee): """ Only works for DatabaseRepository :param approved_by: :return: """ model_cls = find_model_by_name(self.table_name) if not model_cls: raise Exception("Table name given in the ChangeRequest does not exist!") changes = ChangeRequest.deserialize_dates(self.changes) if self.row_id: # is just a change? model = model_cls.read(self.row_id) for diff_type, field, values in changes: if diff_type == 'change': setattr(model, field, values[1]) model_cls.update(model) else: # it must be an entire new object model_raw = {} for diff_type, field, values in changes: if diff_type == 'add': for couple in values: model_raw[couple[0]] = couple[1] model_cls.create(model_cls(model_raw)) self.approved_by_user_id = approved_by.id # pylint: disable=attribute-defined-outside-init self.approved_at = datetime.datetime.now() # pylint: disable=attribute-defined-outside-init self.changes = ChangeRequest.serialize_dates(self.changes) ChangeRequest.update(self) @classmethod def new_empty(cls): raise NotImplementedError @classmethod def prettify_changes(cls, changes, model_name=None) -> str: """ A utility for making changes prettier on the view :param changes: dictdiffer result :param model_name: model str :return: prettified changes """ result = "" for diff_type, field, values in changes: # pylint: disable=unused-variable if diff_type == 'change': if model_name: model_cls = find_model_by_name(model_name) result += f"{model_cls.view_columns[field]}: {values[0]} => {values[1]}\n" else: result += f"{field}: {values[0]} > {values[1]}\n" else: for couple in values: if model_name: model_cls = find_model_by_name(model_name) if couple[0] in model_cls.view_columns: result += f"{model_cls.view_columns[couple[0]]}: {couple[1]}\n" # else: # result += f"{model_cls.view_columns[couple[0]]}: {couple[1]}\n" else: result += f"{couple[0]}: {couple[1]}\n" return result.rstrip() @classmethod def table(cls, metadata=MetaData()) -> Table: return Table(cls.resource_uri, metadata, Column('id', BigInteger().with_variant(Integer, "sqlite"), primary_key=True), Column('author_user_id', BigInteger), Column('table_name', String(64)), Column('row_id', BigInteger, nullable=True), Column('changes', JSON), Column('reason', Text, nullable=True), Column('approved_at', DateTime, nullable=True), Column('created_at', DateTime), Column('modified_at', DateTime), Column('approved_by_user_id', BigInteger, nullable=True), extend_existing=True ) @staticmethod def serialize_dates(changes): for action, field, values in changes: if action == 'add': for i in range(len(values)): couple = values[i] mutable = list(couple) if isinstance(couple[1], datetime.datetime): mutable[1] = couple[1].isoformat() values[i] = tuple(mutable) return changes @staticmethod def deserialize_dates(changes): for action, field, values in changes: if action == 'add': for i in range(len(values)): couple = values[i] mutable = list(couple) try: mutable[1] = datetime.datetime.strptime(couple[1], "%Y-%m-%dT%H:%M:%S") values[i] = tuple(mutable) except: try: mutable[1] = datetime.datetime.strptime(couple[1], "%Y-%m-%dT%H:%M:%S.%f") values[i] = tuple(mutable) except: continue return changes
Ancestors
- DatabaseRepository
- Repository
- CanMutateData
- HasValidation
- DynamicViewModel
- DynamicModel
- abc.ABC
- HasRelationships
Class variables
var engine : sqlalchemy.engine.base.Engine
var metadata : sqlalchemy.sql.schema.MetaData
var url : str
var view_columns : dict
Static methods
def deserialize_dates(changes)
-
Expand source code
@staticmethod def deserialize_dates(changes): for action, field, values in changes: if action == 'add': for i in range(len(values)): couple = values[i] mutable = list(couple) try: mutable[1] = datetime.datetime.strptime(couple[1], "%Y-%m-%dT%H:%M:%S") values[i] = tuple(mutable) except: try: mutable[1] = datetime.datetime.strptime(couple[1], "%Y-%m-%dT%H:%M:%S.%f") values[i] = tuple(mutable) except: continue return changes
def prettify_changes(changes, model_name=None) ‑> str
-
A utility for making changes prettier on the view
:param changes: dictdiffer result :param model_name: model str :return: prettified changes
Expand source code
@classmethod def prettify_changes(cls, changes, model_name=None) -> str: """ A utility for making changes prettier on the view :param changes: dictdiffer result :param model_name: model str :return: prettified changes """ result = "" for diff_type, field, values in changes: # pylint: disable=unused-variable if diff_type == 'change': if model_name: model_cls = find_model_by_name(model_name) result += f"{model_cls.view_columns[field]}: {values[0]} => {values[1]}\n" else: result += f"{field}: {values[0]} > {values[1]}\n" else: for couple in values: if model_name: model_cls = find_model_by_name(model_name) if couple[0] in model_cls.view_columns: result += f"{model_cls.view_columns[couple[0]]}: {couple[1]}\n" # else: # result += f"{model_cls.view_columns[couple[0]]}: {couple[1]}\n" else: result += f"{couple[0]}: {couple[1]}\n" return result.rstrip()
def serialize_dates(changes)
-
Expand source code
@staticmethod def serialize_dates(changes): for action, field, values in changes: if action == 'add': for i in range(len(values)): couple = values[i] mutable = list(couple) if isinstance(couple[1], datetime.datetime): mutable[1] = couple[1].isoformat() values[i] = tuple(mutable) return changes
Methods
def apply_to_db(self, approved_by: Employee)
-
Only works for DatabaseRepository :param approved_by: :return:
Expand source code
def apply_to_db(self, approved_by: Employee): """ Only works for DatabaseRepository :param approved_by: :return: """ model_cls = find_model_by_name(self.table_name) if not model_cls: raise Exception("Table name given in the ChangeRequest does not exist!") changes = ChangeRequest.deserialize_dates(self.changes) if self.row_id: # is just a change? model = model_cls.read(self.row_id) for diff_type, field, values in changes: if diff_type == 'change': setattr(model, field, values[1]) model_cls.update(model) else: # it must be an entire new object model_raw = {} for diff_type, field, values in changes: if diff_type == 'add': for couple in values: model_raw[couple[0]] = couple[1] model_cls.create(model_cls(model_raw)) self.approved_by_user_id = approved_by.id # pylint: disable=attribute-defined-outside-init self.approved_at = datetime.datetime.now() # pylint: disable=attribute-defined-outside-init self.changes = ChangeRequest.serialize_dates(self.changes) ChangeRequest.update(self)
Inherited members