Source code for auth.manager

# ../auth/manager.py

"""Provides a singleton class to access player permissions."""

# =============================================================================
# >> IMPORTS
# =============================================================================
# Python Imports
#   Re
import re
#   Importlib
from importlib.machinery import SourceFileLoader
# Source.Python Imports
#   Auth
from auth.base import Backend
#   Core
from core.settings import _core_settings
#   Listeners
from listeners import OnLevelEnd
#   Paths
from paths import BACKENDS_PATH
#   Players
from players.helpers import playerinfo_from_index
#   Steam
from steam import SteamID


# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = ('_AuthManager',
           'auth_manager',
           'ParentPermissionDict',
           'ParentPermissions',
           'PermissionBase',
           'PlayerPermissionDict',
           'PlayerPermissions',
    )


# =============================================================================
# >> CONSTANTS
# =============================================================================
# Name of the guest parent.
GUEST_PARENT_NAME = 'guest'


# =============================================================================
# >> CLASSES
# =============================================================================
[docs]class PermissionBase(dict): """Base class for parent and player permissions."""
[docs] def __init__(self, name): """Initialize the object.""" super().__init__() self.parents = set() self.name = name if self.name != GUEST_PARENT_NAME: # Don't update the backend, because it's a hidden group self.add_parent(GUEST_PARENT_NAME, update_backend=False)
def __hash__(self): """Return a hash value based on the name.""" # This is required, because we are adding dicts to sets return hash(self.name)
[docs] def add(self, permission, server_id=None, update_backend=True): """Add a permission. :param str permission: The permission to add. :param int server_id: The server ID to which the permission should be added. If no server ID is given, it will be only added to this server. :param bool update_backend: If True, the backend will be updated. """ if (auth_manager.targets_this_server(server_id) and permission not in self.keys()): self[permission] = self._compile_permission(permission) if update_backend and auth_manager.active_backend is not None: auth_manager.active_backend.permission_added( self, permission, auth_manager.server_id if server_id is None else server_id)
[docs] def remove(self, permission, server_id=None, update_backend=True): """Remove a permission. :param str permission: The permission to remove. :param int server_id: The server ID from which the permission should be removed. If no server ID is given, it will be only removed from this server. :param bool update_backend: If True, the backend will be updated. """ if auth_manager.targets_this_server(server_id): try: del self[permission] except KeyError: pass if update_backend and auth_manager.active_backend is not None: auth_manager.active_backend.permission_removed( self, permission, auth_manager.server_id if server_id is None else server_id)
[docs] def add_parent(self, parent_name, update_backend=True): """Add a parent. :param str parent_name: Name of the parent. :param bool update_backend: If True, the backend will be updated. """ parent = auth_manager.parents[parent_name] if parent not in self.parents: # TODO: Detect cycles self.parents.add(parent) parent.children.add(self) if update_backend and auth_manager.active_backend is not None: auth_manager.active_backend.parent_added(self, parent_name)
[docs] def remove_parent(self, parent_name, update_backend=True): """Remove a parent. :param str parent_name: Name of the parent. :param bool update_backend: If True, the backend will be updated. """ parent = auth_manager.parents[parent_name] if parent in self.parents: self.parents.remove(parent) parent.children.remove(self) if update_backend and auth_manager.active_backend is not None: auth_manager.active_backend.parent_removed(self, parent_name)
@staticmethod def _compile_permission(permission): """Compile a permission.""" return re.compile('^{}$'.format( permission.replace('.', '\\.').replace('*', '(.*)'))) def __contains__(self, permission): """Return True if the permission is granted by this object. :rtype: bool """ return self._has_permission(permission, []) def _has_permission(self, permission, name_list): # Checks to see if parents are recursive if self.name in name_list: # Break if recursive return False else: name_list.append(self.name) for re_perm in self.values(): if re_perm.match(permission): return True for parent in self.parents: if parent._has_permission(permission, name_list): return True return False
[docs] def flatten(self): """Return all permissions flattened recursively. :rtype: generator """ yield from self for parent in self.parents: yield from parent
[docs] def clear(self): """Removes all permissions stored in this object and its parents.""" super().clear() self.parents.clear()
[docs]class PlayerPermissions(PermissionBase): """A container for player permissions."""
[docs] def __init__(self, name, steamid64): """Initialize the object. :param str name: A SteamID2, SteamID3 or SteamID64 value. :param int steamid64: The SteamID64 value that was also used to store the object in the :class:PlayerPermissionDict`` object. """ super().__init__(name) self.steamid64 = steamid64
[docs]class ParentPermissions(PermissionBase): """A container for parent permissions."""
[docs] def __init__(self, name): """Initialize the object. :param str name: Name of the parent. """ super().__init__(name) self.children = set()
class _PermissionDict(dict): """A permission storage.""" def clear(self): for value in self.values(): value.clear() super().clear()
[docs]class ParentPermissionDict(_PermissionDict): def __missing__(self, parent_name): """Create, store and return a :class:`ParentPermissions` object. :param str parent_name: The name of the parent to retrieve. :rtype: ParentPermissions """ instance = self[parent_name] = ParentPermissions(parent_name) return instance
[docs]class PlayerPermissionDict(_PermissionDict): def __missing__(self, steamid): """Create, store and return a :class:`PlayerPermissions` object. :param str/int steamid: A SteamID2, SteamID3 or SteamID64 value. :rtype: PlayerPermissions """ if not isinstance(steamid, int): steamid64 = SteamID.parse(steamid).to_uint64() if steamid64 in self: return self[steamid64] # We got a SteamID in a string format, so we can store it by using # its SteamID64 value, but keep the original name. instance = self[steamid64] = PlayerPermissions(steamid, steamid64) else: instance = self[steamid] = PlayerPermissions(steamid, steamid) return instance
class _AuthManager(dict): """Manages backends and configuration files.""" def __init__(self): """Initialize the object.""" self.parents = ParentPermissionDict() self.players = PlayerPermissionDict() self.active_backend = None self.server_id = -1 def find_and_add_available_backends(self): """Find and add all available backends. :raise ValueError: Raised if no backend or multiple backends are found within a single file. """ for backend in BACKENDS_PATH.glob('*.py'): name = 'auth.backend.' + backend.basename().splitext()[0] loader = SourceFileLoader(name, str(backend)) module = loader.load_module(name) for var in vars(module).values(): if isinstance(var, Backend): self[var.name.casefold()] = var break else: raise ValueError( 'Found no backend or multiple backends in "{}".'.format( backend)) def load(self): """Load the auth manager.""" self.server_id = int(_core_settings['AUTH_SETTINGS']['server_id']) self.set_active_backend(_core_settings['AUTH_SETTINGS']['backend']) def unload(self): """Unload the auth manager.""" self._unload_active_backend() def set_active_backend(self, backend_name): """Set the active backend. :param str backend_name: Name of the backend. :raise ValueError: Raised if the backend does not exist. """ try: backend = self[backend_name.casefold()] except KeyError: raise ValueError( 'Backend "{}" does not exist.'.format(backend_name)) self._unload_active_backend() backend.load() self.active_backend = backend def _unload_active_backend(self): """Unload the active backend if there is one.""" if self.active_backend is not None: self.active_backend.unload() self.parents.clear() self.players.clear() self.active_backend = None def is_backend_loaded(self, backend_name): """Return True if the given backend is currently loaded. :rtype: bool """ return (self.active_backend is not None and backend_name.casefold() == self.active_backend.name) def get_player_permissions(self, index): """.. seealso:: :meth:`get_player_permissions_from_steamid`""" return self.get_player_permissions_from_steamid( playerinfo_from_index(index).steamid) def get_player_permissions_from_steamid(self, steamid): """Return the permissions of a player. :param str/int steamid: The SteamID2, SteamID3 or SteamID64 of a player. :return: If the given SteamID is invalid (e.g. 'BOT'), None will be returned. :rtype: PlayerPermissions """ try: return self.players[steamid] except ValueError: return None def is_player_authorized(self, index, permission): """Return True if the player has been granted the given permission. :rtype: bool """ permissions = self.get_player_permissions(index) if permissions is None: return False return permission in permissions def get_parent_permissions(self, parent_name): """Return the parent permissions. :param str parent_name: Name of the parent. :rtype: ParentPermissions """ return self.parents[parent_name] def is_parent_authorized(self, parent_name, permission): """Return True if the parent has been granted the given permission. :rtype: bool """ return permission in self.get_parent_permissions(parent_name) def targets_this_server(self, server_id): """Return whether the server ID targets this server. :param int server_id: A server ID to test. :rtype: bool """ return server_id in (-1, self.server_id, None) #: The singleton object of :class:`_AuthManager`. auth_manager = _AuthManager() @OnLevelEnd def _on_level_end(): backend = auth_manager.active_backend if backend is not None: backend.sync()