Module lib.repository.db
Database Repository with helper methods for setup
Expand source code
"""
Database Repository with helper methods for setup
"""
import datetime
from abc import abstractmethod
from sqlalchemy import create_engine, MetaData, select, Table
from sqlalchemy.engine import Engine
from lib.model import database_models
from lib.repository import Repository
from lib.repository.validator import HasValidation
def database_setup(config):
"""
This statically configures the ORM and database schema
:param config: dict
:return: None
"""
DatabaseRepository.engine = create_engine(config['DB_URL'], echo=False)
DatabaseRepository.url = config['DB_URL']
DatabaseRepository.metadata = MetaData(DatabaseRepository.engine)
for model in database_models:
model.table(metadata=DatabaseRepository.metadata)
DatabaseRepository.metadata.create_all()
class DatabaseRepository(Repository, HasValidation):
"""
A SQL implementation of the Repository class
"""
url: str
metadata: MetaData
engine: Engine
def __init__(self, has_timestamps=True):
Repository.__init__(self)
self._has_timestamps = has_timestamps
if has_timestamps and not getattr(self, 'created_at', None):
self.created_at = datetime.datetime.now()
if has_timestamps and not getattr(self, 'modified_at', None):
self.modified_at = datetime.datetime.now()
@classmethod
def _open_connection(cls):
"""
Used by CRUD methods. This opens a connection with the database
:return: connection instance
"""
return DatabaseRepository.engine.connect()
@abstractmethod
def to_dict(self):
"""
Used by CRUD methods. The model needs to define what is to be stored in the database
:return: dict
"""
raise NotImplementedError
@classmethod
@abstractmethod
def table(cls, metadata=MetaData()) -> Table:
"""
Table definition for the Repository
WARNING: The id field must be named 'id'
:param metadata: sqlalchemy MetaData object
:return: sqlalchemy Table object
"""
raise NotImplementedError
@classmethod
def create(cls, model):
"""
Performs an INSERT
:param model: instance
:param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column
:return: model instance
"""
cls.on_create(model)
connection = cls._open_connection()
statement = cls.table(DatabaseRepository.metadata).insert().values(**model.to_dict())
result = connection.execute(statement)
setattr(model, cls.id_attr, result.inserted_primary_key[0])
connection.close()
inserted = cls.read(getattr(model, cls.id_attr))
return inserted
@classmethod
def read(cls, model_id):
"""
Performs a SELECT * WHERE id = model_id given
:param model_id: ID of model
:param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column
:return: model
"""
connection = cls._open_connection()
table = cls.table(DatabaseRepository.metadata)
statement = select([table]).where(table.c[cls.id_attr] == model_id)
result = connection.execute(statement)
# Converts the result proxy into a dictionary and an array of values only
# data, values = {}, []
data = {}
rowcount = 0
for rowproxy in result:
# rowproxy.items() returns an array like [(key0, value0), (key1, value1)]
for column, value in rowproxy.items():
# build up the dictionary
data = {**data, **{column: value}}
# values.append(data)
rowcount += 1
if rowcount > 0:
model = cls(data) # pylint: disable=too-many-function-args
else:
model = None
connection.close()
cls.after_read(model)
return model
@classmethod
def read_by(cls, filters=None, as_dict=False): # pylint: disable=arguments-differ
"""
Performs a SELECT * WHERE filterKey (=) filterValue given
:param filters: dictionary of fields and their filters
Example:
filters = {
'first_name': 'john_doe', # first_name = 'john_doe'
'age': [('>', 5)], # age > 5
'number_of_corn': [('>=', 5), ('<', 10)] # 5 >= number_of_corn < 10
}
:param as_dict: Return results as dictionary with the `id_attr` as the key
:return: all rows that fit filter criteria
"""
if filters is None:
return cls.read_all()
connection = cls._open_connection()
table = cls.table(DatabaseRepository.metadata)
statement = select([table])
for column, value in filters.items():
if isinstance(value, list):
statement = DatabaseRepository._convert_comparator(statement, table, column, value)
else:
statement = statement.where(table.c[column] == value)
result = connection.execute(statement)
model_dicts = result.fetchall() if result is not None else None
connection.close()
if as_dict:
models = {}
if model_dicts is not None:
for raw in model_dicts:
models[raw[cls.id_attr]] = cls(dict(raw)) # pylint: disable=too-many-function-args
cls.after_read_many(models_read=models.values())
return models
models = []
if model_dicts is not None:
for raw in model_dicts:
models.append(cls(dict(raw))) # pylint: disable=too-many-function-args
cls.after_read_many(models_read=models)
return models
@classmethod
def read_all(cls, as_dict=False): # pylint: disable=arguments-differ
"""
Performs a SELECT * on the entire table
:param as_dict: Return results as dictionary with the `id_attr` as the key
:return: all rows
"""
connection = cls._open_connection()
table = cls.table(DatabaseRepository.metadata)
statement = select([table])
result = connection.execute(statement)
model_dicts = result.fetchall() if result is not None else None
connection.close()
if as_dict:
models = {}
if model_dicts is not None:
for raw in model_dicts:
if raw[0] == -1:
continue
models[raw[cls.id_attr]] = cls(dict(raw)) # pylint: disable=too-many-function-args
cls.after_read_many(models_read=models.values())
return models
models = []
if model_dicts is not None:
for raw in model_dicts:
if raw[0] == -1:
continue
models.append(cls(dict(raw))) # pylint: disable=too-many-function-args
cls.after_read_many(models_read=models)
return models
@classmethod
def update(cls, model):
"""
Performs an UPDATE <values> WHERE id = model_id query
:param model: instance
:param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column
:return: model instance
"""
cls.on_update(model)
if model._has_timestamps: # pylint: disable=protected-access
model.modified_at = datetime.datetime.now()
connection = cls._open_connection()
table = cls.table(DatabaseRepository.metadata)
statement = table.update().where(table.c[cls.id_attr] == model.id).values(**model.to_dict())
connection.execute(statement)
connection.close()
return model
@classmethod
def destroy(cls, model_id):
"""
Performs a DELETE WHERE id = model_id query
:param model_id: id only
:param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column
:return: None
"""
cls.on_destroy(model_id)
connection = cls._open_connection()
table = cls.table(DatabaseRepository.metadata)
statement = table.delete().where(table.c[cls.id_attr] == model_id)
connection.execute(statement)
connection.close()
@classmethod
def run_statement(cls, statement):
"""
Performs any statement given. Note that timestamps are
NOT updated here nor are Layers called
:param statement: can be created like the following:
table = self.table(DatabaseRepository.metadata)
statement = table.delete().where(...)
:return: all rows of result set
"""
connection = cls._open_connection()
result = connection.execute(statement)
results = result.fetchall() if result is not None else None
connection.close()
return results
@classmethod
def _convert_comparator(cls, statement, table, field, expressions):
"""
Converts expressions like [('>', 5)] into x > 5 in a SQLAlchemy statement
:param statement: statement to build upon
:param expressions: an array of couples, the first being the comparator
[(comparator, value})
comparators should be strings
:return: statement built
"""
for expression in expressions:
if len(expression) != 2:
raise ValueError('DB Expressions should only have 1 comparator and 1 value')
if expression[0] == '=':
statement = statement.where(table.c[field] == expression[1])
elif expression[0] == '!=':
statement = statement.where(table.c[field] != expression[1])
elif expression[0] == '>=':
statement = statement.where(table.c[field] >= expression[1])
elif expression[0] == '<=':
statement = statement.where(table.c[field] <= expression[1])
elif expression[0] == '>':
statement = statement.where(table.c[field] > expression[1])
elif expression[0] == '<':
statement = statement.where(table.c[field] < expression[1])
elif expression[0] == 'like':
statement = statement.where(table.c[field].like(expression[1]))
else:
raise NotImplementedError('Invalid comparator given in DB Expression')
return statement
Functions
def database_setup(config)
-
This statically configures the ORM and database schema :param config: dict :return: None
Expand source code
def database_setup(config): """ This statically configures the ORM and database schema :param config: dict :return: None """ DatabaseRepository.engine = create_engine(config['DB_URL'], echo=False) DatabaseRepository.url = config['DB_URL'] DatabaseRepository.metadata = MetaData(DatabaseRepository.engine) for model in database_models: model.table(metadata=DatabaseRepository.metadata) DatabaseRepository.metadata.create_all()
Classes
class DatabaseRepository (has_timestamps=True)
-
A SQL implementation of the Repository class
Expand source code
class DatabaseRepository(Repository, HasValidation): """ A SQL implementation of the Repository class """ url: str metadata: MetaData engine: Engine def __init__(self, has_timestamps=True): Repository.__init__(self) self._has_timestamps = has_timestamps if has_timestamps and not getattr(self, 'created_at', None): self.created_at = datetime.datetime.now() if has_timestamps and not getattr(self, 'modified_at', None): self.modified_at = datetime.datetime.now() @classmethod def _open_connection(cls): """ Used by CRUD methods. This opens a connection with the database :return: connection instance """ return DatabaseRepository.engine.connect() @abstractmethod def to_dict(self): """ Used by CRUD methods. The model needs to define what is to be stored in the database :return: dict """ raise NotImplementedError @classmethod @abstractmethod def table(cls, metadata=MetaData()) -> Table: """ Table definition for the Repository WARNING: The id field must be named 'id' :param metadata: sqlalchemy MetaData object :return: sqlalchemy Table object """ raise NotImplementedError @classmethod def create(cls, model): """ Performs an INSERT :param model: instance :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model instance """ cls.on_create(model) connection = cls._open_connection() statement = cls.table(DatabaseRepository.metadata).insert().values(**model.to_dict()) result = connection.execute(statement) setattr(model, cls.id_attr, result.inserted_primary_key[0]) connection.close() inserted = cls.read(getattr(model, cls.id_attr)) return inserted @classmethod def read(cls, model_id): """ Performs a SELECT * WHERE id = model_id given :param model_id: ID of model :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model """ connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = select([table]).where(table.c[cls.id_attr] == model_id) result = connection.execute(statement) # Converts the result proxy into a dictionary and an array of values only # data, values = {}, [] data = {} rowcount = 0 for rowproxy in result: # rowproxy.items() returns an array like [(key0, value0), (key1, value1)] for column, value in rowproxy.items(): # build up the dictionary data = {**data, **{column: value}} # values.append(data) rowcount += 1 if rowcount > 0: model = cls(data) # pylint: disable=too-many-function-args else: model = None connection.close() cls.after_read(model) return model @classmethod def read_by(cls, filters=None, as_dict=False): # pylint: disable=arguments-differ """ Performs a SELECT * WHERE filterKey (=) filterValue given :param filters: dictionary of fields and their filters Example: filters = { 'first_name': 'john_doe', # first_name = 'john_doe' 'age': [('>', 5)], # age > 5 'number_of_corn': [('>=', 5), ('<', 10)] # 5 >= number_of_corn < 10 } :param as_dict: Return results as dictionary with the `id_attr` as the key :return: all rows that fit filter criteria """ if filters is None: return cls.read_all() connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = select([table]) for column, value in filters.items(): if isinstance(value, list): statement = DatabaseRepository._convert_comparator(statement, table, column, value) else: statement = statement.where(table.c[column] == value) result = connection.execute(statement) model_dicts = result.fetchall() if result is not None else None connection.close() if as_dict: models = {} if model_dicts is not None: for raw in model_dicts: models[raw[cls.id_attr]] = cls(dict(raw)) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models.values()) return models models = [] if model_dicts is not None: for raw in model_dicts: models.append(cls(dict(raw))) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models) return models @classmethod def read_all(cls, as_dict=False): # pylint: disable=arguments-differ """ Performs a SELECT * on the entire table :param as_dict: Return results as dictionary with the `id_attr` as the key :return: all rows """ connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = select([table]) result = connection.execute(statement) model_dicts = result.fetchall() if result is not None else None connection.close() if as_dict: models = {} if model_dicts is not None: for raw in model_dicts: if raw[0] == -1: continue models[raw[cls.id_attr]] = cls(dict(raw)) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models.values()) return models models = [] if model_dicts is not None: for raw in model_dicts: if raw[0] == -1: continue models.append(cls(dict(raw))) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models) return models @classmethod def update(cls, model): """ Performs an UPDATE <values> WHERE id = model_id query :param model: instance :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model instance """ cls.on_update(model) if model._has_timestamps: # pylint: disable=protected-access model.modified_at = datetime.datetime.now() connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = table.update().where(table.c[cls.id_attr] == model.id).values(**model.to_dict()) connection.execute(statement) connection.close() return model @classmethod def destroy(cls, model_id): """ Performs a DELETE WHERE id = model_id query :param model_id: id only :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: None """ cls.on_destroy(model_id) connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = table.delete().where(table.c[cls.id_attr] == model_id) connection.execute(statement) connection.close() @classmethod def run_statement(cls, statement): """ Performs any statement given. Note that timestamps are NOT updated here nor are Layers called :param statement: can be created like the following: table = self.table(DatabaseRepository.metadata) statement = table.delete().where(...) :return: all rows of result set """ connection = cls._open_connection() result = connection.execute(statement) results = result.fetchall() if result is not None else None connection.close() return results @classmethod def _convert_comparator(cls, statement, table, field, expressions): """ Converts expressions like [('>', 5)] into x > 5 in a SQLAlchemy statement :param statement: statement to build upon :param expressions: an array of couples, the first being the comparator [(comparator, value}) comparators should be strings :return: statement built """ for expression in expressions: if len(expression) != 2: raise ValueError('DB Expressions should only have 1 comparator and 1 value') if expression[0] == '=': statement = statement.where(table.c[field] == expression[1]) elif expression[0] == '!=': statement = statement.where(table.c[field] != expression[1]) elif expression[0] == '>=': statement = statement.where(table.c[field] >= expression[1]) elif expression[0] == '<=': statement = statement.where(table.c[field] <= expression[1]) elif expression[0] == '>': statement = statement.where(table.c[field] > expression[1]) elif expression[0] == '<': statement = statement.where(table.c[field] < expression[1]) elif expression[0] == 'like': statement = statement.where(table.c[field].like(expression[1])) else: raise NotImplementedError('Invalid comparator given in DB Expression') return statement
Ancestors
Subclasses
Class variables
var engine : sqlalchemy.engine.base.Engine
var metadata : sqlalchemy.sql.schema.MetaData
var url : str
Static methods
def create(model)
-
Performs an INSERT
:param model: instance :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model instance
Expand source code
@classmethod def create(cls, model): """ Performs an INSERT :param model: instance :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model instance """ cls.on_create(model) connection = cls._open_connection() statement = cls.table(DatabaseRepository.metadata).insert().values(**model.to_dict()) result = connection.execute(statement) setattr(model, cls.id_attr, result.inserted_primary_key[0]) connection.close() inserted = cls.read(getattr(model, cls.id_attr)) return inserted
def destroy(model_id)
-
Performs a DELETE WHERE id = model_id query
:param model_id: id only :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: None
Expand source code
@classmethod def destroy(cls, model_id): """ Performs a DELETE WHERE id = model_id query :param model_id: id only :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: None """ cls.on_destroy(model_id) connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = table.delete().where(table.c[cls.id_attr] == model_id) connection.execute(statement) connection.close()
def read(model_id)
-
Performs a SELECT * WHERE id = model_id given
:param model_id: ID of model :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model
Expand source code
@classmethod def read(cls, model_id): """ Performs a SELECT * WHERE id = model_id given :param model_id: ID of model :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model """ connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = select([table]).where(table.c[cls.id_attr] == model_id) result = connection.execute(statement) # Converts the result proxy into a dictionary and an array of values only # data, values = {}, [] data = {} rowcount = 0 for rowproxy in result: # rowproxy.items() returns an array like [(key0, value0), (key1, value1)] for column, value in rowproxy.items(): # build up the dictionary data = {**data, **{column: value}} # values.append(data) rowcount += 1 if rowcount > 0: model = cls(data) # pylint: disable=too-many-function-args else: model = None connection.close() cls.after_read(model) return model
def read_all(as_dict=False)
-
Performs a SELECT * on the entire table
:param as_dict: Return results as dictionary with the
id_attr
as the key :return: all rowsExpand source code
@classmethod def read_all(cls, as_dict=False): # pylint: disable=arguments-differ """ Performs a SELECT * on the entire table :param as_dict: Return results as dictionary with the `id_attr` as the key :return: all rows """ connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = select([table]) result = connection.execute(statement) model_dicts = result.fetchall() if result is not None else None connection.close() if as_dict: models = {} if model_dicts is not None: for raw in model_dicts: if raw[0] == -1: continue models[raw[cls.id_attr]] = cls(dict(raw)) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models.values()) return models models = [] if model_dicts is not None: for raw in model_dicts: if raw[0] == -1: continue models.append(cls(dict(raw))) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models) return models
def read_by(filters=None, as_dict=False)
-
Performs a SELECT * WHERE filterKey (=) filterValue given
:param filters: dictionary of fields and their filters Example: filters = { 'first_name': 'john_doe', # first_name = 'john_doe' 'age': [('>', 5)], # age > 5 'number_of_corn': [('>=', 5), ('<', 10)] # 5 >= number_of_corn < 10 } :param as_dict: Return results as dictionary with the
id_attr
as the key :return: all rows that fit filter criteriaExpand source code
@classmethod def read_by(cls, filters=None, as_dict=False): # pylint: disable=arguments-differ """ Performs a SELECT * WHERE filterKey (=) filterValue given :param filters: dictionary of fields and their filters Example: filters = { 'first_name': 'john_doe', # first_name = 'john_doe' 'age': [('>', 5)], # age > 5 'number_of_corn': [('>=', 5), ('<', 10)] # 5 >= number_of_corn < 10 } :param as_dict: Return results as dictionary with the `id_attr` as the key :return: all rows that fit filter criteria """ if filters is None: return cls.read_all() connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = select([table]) for column, value in filters.items(): if isinstance(value, list): statement = DatabaseRepository._convert_comparator(statement, table, column, value) else: statement = statement.where(table.c[column] == value) result = connection.execute(statement) model_dicts = result.fetchall() if result is not None else None connection.close() if as_dict: models = {} if model_dicts is not None: for raw in model_dicts: models[raw[cls.id_attr]] = cls(dict(raw)) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models.values()) return models models = [] if model_dicts is not None: for raw in model_dicts: models.append(cls(dict(raw))) # pylint: disable=too-many-function-args cls.after_read_many(models_read=models) return models
def run_statement(statement)
-
Performs any statement given. Note that timestamps are NOT updated here nor are Layers called
:param statement: can be created like the following: table = self.table(DatabaseRepository.metadata) statement = table.delete().where(…) :return: all rows of result set
Expand source code
@classmethod def run_statement(cls, statement): """ Performs any statement given. Note that timestamps are NOT updated here nor are Layers called :param statement: can be created like the following: table = self.table(DatabaseRepository.metadata) statement = table.delete().where(...) :return: all rows of result set """ connection = cls._open_connection() result = connection.execute(statement) results = result.fetchall() if result is not None else None connection.close() return results
def table(metadata=MetaData(bind=None)) ‑> sqlalchemy.sql.schema.Table
-
Table definition for the Repository
WARNING: The id field must be named 'id'
:param metadata: sqlalchemy MetaData object :return: sqlalchemy Table object
Expand source code
@classmethod @abstractmethod def table(cls, metadata=MetaData()) -> Table: """ Table definition for the Repository WARNING: The id field must be named 'id' :param metadata: sqlalchemy MetaData object :return: sqlalchemy Table object """ raise NotImplementedError
def update(model)
-
Performs an UPDATE
WHERE id = model_id query :param model: instance :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model instance
Expand source code
@classmethod def update(cls, model): """ Performs an UPDATE <values> WHERE id = model_id query :param model: instance :param id_attr: Optional. Name of ID attribute (default is 'id'). Matches to column :return: model instance """ cls.on_update(model) if model._has_timestamps: # pylint: disable=protected-access model.modified_at = datetime.datetime.now() connection = cls._open_connection() table = cls.table(DatabaseRepository.metadata) statement = table.update().where(table.c[cls.id_attr] == model.id).values(**model.to_dict()) connection.execute(statement) connection.close() return model
Methods
def to_dict(self)
-
Used by CRUD methods. The model needs to define what is to be stored in the database :return: dict
Expand source code
@abstractmethod def to_dict(self): """ Used by CRUD methods. The model needs to define what is to be stored in the database :return: dict """ raise NotImplementedError
Inherited members