Module lib.layer.security

Roles and Security Layer

Expand source code
"""
Roles and Security Layer
"""

import dictdiffer

from lib.layer import Layer
from lib.model.change_request import ChangeRequest

CAN_CREATE = 'can_create'  # falsy. Base rule is to reject
CANT_READ = 'cant_read'  # truthy. Base rule is to allow. Deletes fields during read
CAN_UPDATE = 'can_update'  # falsy. Base rule is to reject
CAN_DESTROY = 'can_delete'  # falsy. Base rule is to reject
CAN_APPROVE = 'can_approve'  # falsy. Base rule is to reject
CAN = 'can'

ROLES = {
    "Admin": {
        CAN_CREATE: [
            "*"
        ],
        CAN_UPDATE: {
            "*"
        },
        CAN_DESTROY: [
            "*"
        ],
        CAN_APPROVE: {
            "*"
        }
    },
    "Accounting": {
        CAN_UPDATE: {
            "employee": [
                "salary",
                "hourly_rate",
                "commission_rate",
                "bank_routing",
                "bank_account",
                "classification_id",
                "paymethod_id",
                "role"
            ],
            "timesheet": [
                "*"
            ],
            "receipt": [
                "*"
            ]
        },
        CAN_CREATE: [
            "timesheet",
            "receipt"
        ],
        CAN: [
            "payroll",
        ]
    },
    "Reporter": {
        CANT_READ: {
            "employee": [
                "role",
                "social_security_number",
                "address_line1",
                "address_line2",
                "city",
                "state",
                "zipcode",
                "salary",
                "hourly_rate",
                "commission_rate",
                "bank_routing",
                "bank_account",
                "paymethod_id",
                "payment_method",  # view model field of the above
            ]
        },
        CAN_CREATE: [
            "timesheet",
            "receipt"
        ],
        CAN_UPDATE: {
            "timesheet": {
                "*"
            },
            "receipt": {
                "*"
            }
        }
    },
    "Viewer": {
        CANT_READ: {
            "employee": [
                "id",
                "role",
                "social_security_number",
                "start_date",
                "date_of_birth",
                "address_line1",
                "address_line2",
                "city",
                "state",
                "zipcode",
                "salary",
                "hourly_rate",
                "commission_rate",
                "bank_routing",
                "bank_account",
                "classification_id",
                "paymethod_id",
                "classification",  # view model field of the above
                "payment_method",  # view model field of the above
                'date_left',
                'notes'
            ]
        }
    }
}


class SecurityException(Exception):
    """
    Thrown when some security policy is violated
    """


class ChangeRequestException(Exception):
    """
    Thrown when a change request is entered instead of changing the data source directly
    """

    def __init__(self, message, *args, request=None):
        super().__init__(message, *args)
        self.request = request


class SecurityLayer(Layer):
    """
    Enforces role-based policies as outlined in ROLES
    """

    def __init__(self, user):
        super().__init__()

        if user.role not in ROLES:
            raise ValueError(f'No role programmed for role "{user.role}"')

        self.user_role_name = user.role
        self.user_role = ROLES[user.role]
        self.user = user

    @staticmethod
    def _get_model_name_from_repo_cls(repo_cls):
        return repo_cls.__name__.lower()

    def on_create(self, repo_cls, new_model):
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if model_name == 'changerequest':
            return

        if CAN_CREATE not in self.user_role:
            raise SecurityException(f'Creating {model_name} records is not allowed')

        if '*' in self.user_role[CAN_CREATE]:
            return

        if model_name not in self.user_role[CAN_CREATE]:
            raise SecurityException(f'Creating {model_name} records is not allowed')

        changes = list(dictdiffer.diff({}, new_model.to_dict()))

        # everyone who is not an Admin must request
        request = ChangeRequest({
            'author_user_id': self.user.id,
            'table_name': repo_cls.resource_uri,
            'row_id': None,
            'changes': ChangeRequest.serialize_dates(changes),
            'reason': 'No reason given'
        })
        request = ChangeRequest.create(request)
        raise ChangeRequestException('Request created successfully', request)

    def on_read_one(self, repo_cls, model):
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if CANT_READ not in self.user_role:
            return

        if '*' in self.user_role[CANT_READ]:
            raise SecurityException(f'Cannot read this {model_name}! '
                                    f'Insufficient permission.')

        if model_name in self.user_role[CANT_READ]:
            for field in self.user_role[CANT_READ][model_name]:
                if field == '*':
                    raise SecurityException(f'Cannot read this {model_name}! '
                                            f'Insufficient permission.')

                if hasattr(model, field):
                    delattr(model, field)

    def on_read_many(self, repo_cls, models):
        raise NotImplementedError('lib.repository._call_middlewares currently '
                                  'calls on_read_one iteratively')

    def on_update(self, repo_cls, updated_model, id_attr='id'):  # pylint: disable=too-many-branches
        """
        Uses dictdiffer to find the differences in the dictionary versions of the
        old and new model.

        If the following dictionaries were given:

        a_dict = {
            'a': 'foo',
            'b': 'bar',
            'd': 'barfoo'
        }

        b_dict = {
            'a': 'foo',
            'b': 'BAR',
            'c': 'foobar'
        }

        The dictdiffer tool would return the following:

        difference = list(dictdiffer.diff(a_dict, b_dict))
        print(difference)
            [
                ('change', 'b', ('bar', 'BAR')),
                ('add', '', [('c', 'foobar')]),
                ('remove', '', [('d', 'barfoo')])
            ]
        """
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if CAN_UPDATE not in self.user_role:
            raise SecurityException(f'Updating {model_name} records is not allowed')

        if '*' in self.user_role[CAN_UPDATE]:
            return

        if model_name not in self.user_role[CAN_UPDATE]:
            raise SecurityException(f'Updating {model_name} records is not allowed')

        # self.user_role[CAN_UPDATE][model_name] exists
        # now to look at the nitty-gritty of the changes...

        old_model = repo_cls.read(getattr(updated_model, repo_cls.id_attr))

        changes = list(dictdiffer.diff(old_model.to_dict(), updated_model.to_dict()))
        # print(changes)

        for action, field, values in changes:
            if action == 'add':
                print('Warning: attempt to add a field made. Was this desired?')
                for couple in values:
                    for key, ignored in couple:  # pylint: disable=unused-variable
                        if key not in self.user_role[CAN_UPDATE][model_name]:
                            raise SecurityException(f'Updating the {key} field in '
                                                    f'{model_name} records is not allowed')
            elif action == 'remove':
                for couple in values:
                    for key, ignored in couple:  # pylint: disable=unused-variable
                        if key not in self.user_role[CAN_UPDATE][model_name]:
                            raise SecurityException(f'Updating the {key} field in '
                                                    f'{model_name} records is not allowed')
            elif action == 'change':
                if field not in self.user_role[CAN_UPDATE][model_name]:
                    raise SecurityException(f'Updating the {field} field in '
                                            f'{model_name} records is not allowed')

        # everyone who is not an Admin must request
        request = ChangeRequest({
            'author_user_id': self.user.id,
            'table_name': repo_cls.resource_uri,
            'row_id': getattr(updated_model, id_attr),
            'changes': ChangeRequest.serialize_dates(changes),
            'reason': 'No reason given'
        })
        request = ChangeRequest.create(request)
        raise ChangeRequestException('Request created successfully', request)

    def on_destroy(self, repo_cls, model_id, id_attr='id'):
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if CAN_DESTROY not in self.user_role:
            raise SecurityException(f'Destroying {model_name} records is not allowed')

        if '*' in self.user_role[CAN_DESTROY]:
            return

        if model_name not in self.user_role[CAN_DESTROY]:
            raise SecurityException(f'Destroying {model_name} records is not allowed')

    def can_create(self, resource_uri):
        """
        Checks if user can create X

        :param resource_uri: X
        :return: True if can create X
        """
        if self.user_role and CAN_CREATE in self.user_role and '*' in self.user_role[CAN_UPDATE]:
            return True

        return self.user_role \
               and CAN_CREATE in self.user_role \
               and resource_uri in self.user_role[CAN_CREATE]

    def can_read(self, resource_uri, field):
        """
        Calculates if a role can access a certain field

        :param role: Role string
        :param resource_uri: URI str
        :param field: field name
        :return: bool if can read
        """

        return not (self.user_role
                    and CANT_READ in self.user_role
                    and resource_uri in self.user_role[CANT_READ]
                    and field in self.user_role[CANT_READ][resource_uri])

    def can_update(self, resource_uri):
        """
        Checks if user can update X

        :param resource_uri: X
        :return: True if can update X
        """
        if self.user_role and CAN_UPDATE in self.user_role and '*' in self.user_role[CAN_UPDATE]:
            return True

        return self.user_role \
               and CAN_UPDATE in self.user_role \
               and resource_uri in self.user_role[CAN_UPDATE]

    def can_(self, custom_operation: str):
        """
        Checks if an arbitrary operation can be done with this role

        :param custom_operation: operation string
        :return: bool if can do X
        """
        if self.user_role_name == 'Admin':
            return True

        return self.user_role and CAN in self.user_role \
               and custom_operation in self.user_role[CAN]

Classes

class ChangeRequestException (message, *args, request=None)

Thrown when a change request is entered instead of changing the data source directly

Expand source code
class ChangeRequestException(Exception):
    """
    Thrown when a change request is entered instead of changing the data source directly
    """

    def __init__(self, message, *args, request=None):
        super().__init__(message, *args)
        self.request = request

Ancestors

  • builtins.Exception
  • builtins.BaseException
class SecurityException (*args, **kwargs)

Thrown when some security policy is violated

Expand source code
class SecurityException(Exception):
    """
    Thrown when some security policy is violated
    """

Ancestors

  • builtins.Exception
  • builtins.BaseException
class SecurityLayer (user)

Enforces role-based policies as outlined in ROLES

Expand source code
class SecurityLayer(Layer):
    """
    Enforces role-based policies as outlined in ROLES
    """

    def __init__(self, user):
        super().__init__()

        if user.role not in ROLES:
            raise ValueError(f'No role programmed for role "{user.role}"')

        self.user_role_name = user.role
        self.user_role = ROLES[user.role]
        self.user = user

    @staticmethod
    def _get_model_name_from_repo_cls(repo_cls):
        return repo_cls.__name__.lower()

    def on_create(self, repo_cls, new_model):
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if model_name == 'changerequest':
            return

        if CAN_CREATE not in self.user_role:
            raise SecurityException(f'Creating {model_name} records is not allowed')

        if '*' in self.user_role[CAN_CREATE]:
            return

        if model_name not in self.user_role[CAN_CREATE]:
            raise SecurityException(f'Creating {model_name} records is not allowed')

        changes = list(dictdiffer.diff({}, new_model.to_dict()))

        # everyone who is not an Admin must request
        request = ChangeRequest({
            'author_user_id': self.user.id,
            'table_name': repo_cls.resource_uri,
            'row_id': None,
            'changes': ChangeRequest.serialize_dates(changes),
            'reason': 'No reason given'
        })
        request = ChangeRequest.create(request)
        raise ChangeRequestException('Request created successfully', request)

    def on_read_one(self, repo_cls, model):
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if CANT_READ not in self.user_role:
            return

        if '*' in self.user_role[CANT_READ]:
            raise SecurityException(f'Cannot read this {model_name}! '
                                    f'Insufficient permission.')

        if model_name in self.user_role[CANT_READ]:
            for field in self.user_role[CANT_READ][model_name]:
                if field == '*':
                    raise SecurityException(f'Cannot read this {model_name}! '
                                            f'Insufficient permission.')

                if hasattr(model, field):
                    delattr(model, field)

    def on_read_many(self, repo_cls, models):
        raise NotImplementedError('lib.repository._call_middlewares currently '
                                  'calls on_read_one iteratively')

    def on_update(self, repo_cls, updated_model, id_attr='id'):  # pylint: disable=too-many-branches
        """
        Uses dictdiffer to find the differences in the dictionary versions of the
        old and new model.

        If the following dictionaries were given:

        a_dict = {
            'a': 'foo',
            'b': 'bar',
            'd': 'barfoo'
        }

        b_dict = {
            'a': 'foo',
            'b': 'BAR',
            'c': 'foobar'
        }

        The dictdiffer tool would return the following:

        difference = list(dictdiffer.diff(a_dict, b_dict))
        print(difference)
            [
                ('change', 'b', ('bar', 'BAR')),
                ('add', '', [('c', 'foobar')]),
                ('remove', '', [('d', 'barfoo')])
            ]
        """
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if CAN_UPDATE not in self.user_role:
            raise SecurityException(f'Updating {model_name} records is not allowed')

        if '*' in self.user_role[CAN_UPDATE]:
            return

        if model_name not in self.user_role[CAN_UPDATE]:
            raise SecurityException(f'Updating {model_name} records is not allowed')

        # self.user_role[CAN_UPDATE][model_name] exists
        # now to look at the nitty-gritty of the changes...

        old_model = repo_cls.read(getattr(updated_model, repo_cls.id_attr))

        changes = list(dictdiffer.diff(old_model.to_dict(), updated_model.to_dict()))
        # print(changes)

        for action, field, values in changes:
            if action == 'add':
                print('Warning: attempt to add a field made. Was this desired?')
                for couple in values:
                    for key, ignored in couple:  # pylint: disable=unused-variable
                        if key not in self.user_role[CAN_UPDATE][model_name]:
                            raise SecurityException(f'Updating the {key} field in '
                                                    f'{model_name} records is not allowed')
            elif action == 'remove':
                for couple in values:
                    for key, ignored in couple:  # pylint: disable=unused-variable
                        if key not in self.user_role[CAN_UPDATE][model_name]:
                            raise SecurityException(f'Updating the {key} field in '
                                                    f'{model_name} records is not allowed')
            elif action == 'change':
                if field not in self.user_role[CAN_UPDATE][model_name]:
                    raise SecurityException(f'Updating the {field} field in '
                                            f'{model_name} records is not allowed')

        # everyone who is not an Admin must request
        request = ChangeRequest({
            'author_user_id': self.user.id,
            'table_name': repo_cls.resource_uri,
            'row_id': getattr(updated_model, id_attr),
            'changes': ChangeRequest.serialize_dates(changes),
            'reason': 'No reason given'
        })
        request = ChangeRequest.create(request)
        raise ChangeRequestException('Request created successfully', request)

    def on_destroy(self, repo_cls, model_id, id_attr='id'):
        model_name = self._get_model_name_from_repo_cls(repo_cls)

        if CAN_DESTROY not in self.user_role:
            raise SecurityException(f'Destroying {model_name} records is not allowed')

        if '*' in self.user_role[CAN_DESTROY]:
            return

        if model_name not in self.user_role[CAN_DESTROY]:
            raise SecurityException(f'Destroying {model_name} records is not allowed')

    def can_create(self, resource_uri):
        """
        Checks if user can create X

        :param resource_uri: X
        :return: True if can create X
        """
        if self.user_role and CAN_CREATE in self.user_role and '*' in self.user_role[CAN_UPDATE]:
            return True

        return self.user_role \
               and CAN_CREATE in self.user_role \
               and resource_uri in self.user_role[CAN_CREATE]

    def can_read(self, resource_uri, field):
        """
        Calculates if a role can access a certain field

        :param role: Role string
        :param resource_uri: URI str
        :param field: field name
        :return: bool if can read
        """

        return not (self.user_role
                    and CANT_READ in self.user_role
                    and resource_uri in self.user_role[CANT_READ]
                    and field in self.user_role[CANT_READ][resource_uri])

    def can_update(self, resource_uri):
        """
        Checks if user can update X

        :param resource_uri: X
        :return: True if can update X
        """
        if self.user_role and CAN_UPDATE in self.user_role and '*' in self.user_role[CAN_UPDATE]:
            return True

        return self.user_role \
               and CAN_UPDATE in self.user_role \
               and resource_uri in self.user_role[CAN_UPDATE]

    def can_(self, custom_operation: str):
        """
        Checks if an arbitrary operation can be done with this role

        :param custom_operation: operation string
        :return: bool if can do X
        """
        if self.user_role_name == 'Admin':
            return True

        return self.user_role and CAN in self.user_role \
               and custom_operation in self.user_role[CAN]

Ancestors

Methods

def can_(self, custom_operation: str)

Checks if an arbitrary operation can be done with this role

:param custom_operation: operation string :return: bool if can do X

Expand source code
def can_(self, custom_operation: str):
    """
    Checks if an arbitrary operation can be done with this role

    :param custom_operation: operation string
    :return: bool if can do X
    """
    if self.user_role_name == 'Admin':
        return True

    return self.user_role and CAN in self.user_role \
           and custom_operation in self.user_role[CAN]
def can_create(self, resource_uri)

Checks if user can create X

:param resource_uri: X :return: True if can create X

Expand source code
def can_create(self, resource_uri):
    """
    Checks if user can create X

    :param resource_uri: X
    :return: True if can create X
    """
    if self.user_role and CAN_CREATE in self.user_role and '*' in self.user_role[CAN_UPDATE]:
        return True

    return self.user_role \
           and CAN_CREATE in self.user_role \
           and resource_uri in self.user_role[CAN_CREATE]
def can_read(self, resource_uri, field)

Calculates if a role can access a certain field

:param role: Role string :param resource_uri: URI str :param field: field name :return: bool if can read

Expand source code
def can_read(self, resource_uri, field):
    """
    Calculates if a role can access a certain field

    :param role: Role string
    :param resource_uri: URI str
    :param field: field name
    :return: bool if can read
    """

    return not (self.user_role
                and CANT_READ in self.user_role
                and resource_uri in self.user_role[CANT_READ]
                and field in self.user_role[CANT_READ][resource_uri])
def can_update(self, resource_uri)

Checks if user can update X

:param resource_uri: X :return: True if can update X

Expand source code
def can_update(self, resource_uri):
    """
    Checks if user can update X

    :param resource_uri: X
    :return: True if can update X
    """
    if self.user_role and CAN_UPDATE in self.user_role and '*' in self.user_role[CAN_UPDATE]:
        return True

    return self.user_role \
           and CAN_UPDATE in self.user_role \
           and resource_uri in self.user_role[CAN_UPDATE]
def on_update(self, repo_cls, updated_model, id_attr='id')

Uses dictdiffer to find the differences in the dictionary versions of the old and new model.

If the following dictionaries were given:

a_dict = { 'a': 'foo', 'b': 'bar', 'd': 'barfoo' }

b_dict = { 'a': 'foo', 'b': 'BAR', 'c': 'foobar' }

The dictdiffer tool would return the following:

difference = list(dictdiffer.diff(a_dict, b_dict)) print(difference) [ ('change', 'b', ('bar', 'BAR')), ('add', '', [('c', 'foobar')]), ('remove', '', [('d', 'barfoo')]) ]

Expand source code
def on_update(self, repo_cls, updated_model, id_attr='id'):  # pylint: disable=too-many-branches
    """
    Uses dictdiffer to find the differences in the dictionary versions of the
    old and new model.

    If the following dictionaries were given:

    a_dict = {
        'a': 'foo',
        'b': 'bar',
        'd': 'barfoo'
    }

    b_dict = {
        'a': 'foo',
        'b': 'BAR',
        'c': 'foobar'
    }

    The dictdiffer tool would return the following:

    difference = list(dictdiffer.diff(a_dict, b_dict))
    print(difference)
        [
            ('change', 'b', ('bar', 'BAR')),
            ('add', '', [('c', 'foobar')]),
            ('remove', '', [('d', 'barfoo')])
        ]
    """
    model_name = self._get_model_name_from_repo_cls(repo_cls)

    if CAN_UPDATE not in self.user_role:
        raise SecurityException(f'Updating {model_name} records is not allowed')

    if '*' in self.user_role[CAN_UPDATE]:
        return

    if model_name not in self.user_role[CAN_UPDATE]:
        raise SecurityException(f'Updating {model_name} records is not allowed')

    # self.user_role[CAN_UPDATE][model_name] exists
    # now to look at the nitty-gritty of the changes...

    old_model = repo_cls.read(getattr(updated_model, repo_cls.id_attr))

    changes = list(dictdiffer.diff(old_model.to_dict(), updated_model.to_dict()))
    # print(changes)

    for action, field, values in changes:
        if action == 'add':
            print('Warning: attempt to add a field made. Was this desired?')
            for couple in values:
                for key, ignored in couple:  # pylint: disable=unused-variable
                    if key not in self.user_role[CAN_UPDATE][model_name]:
                        raise SecurityException(f'Updating the {key} field in '
                                                f'{model_name} records is not allowed')
        elif action == 'remove':
            for couple in values:
                for key, ignored in couple:  # pylint: disable=unused-variable
                    if key not in self.user_role[CAN_UPDATE][model_name]:
                        raise SecurityException(f'Updating the {key} field in '
                                                f'{model_name} records is not allowed')
        elif action == 'change':
            if field not in self.user_role[CAN_UPDATE][model_name]:
                raise SecurityException(f'Updating the {field} field in '
                                        f'{model_name} records is not allowed')

    # everyone who is not an Admin must request
    request = ChangeRequest({
        'author_user_id': self.user.id,
        'table_name': repo_cls.resource_uri,
        'row_id': getattr(updated_model, id_attr),
        'changes': ChangeRequest.serialize_dates(changes),
        'reason': 'No reason given'
    })
    request = ChangeRequest.create(request)
    raise ChangeRequestException('Request created successfully', request)

Inherited members