Source code for commands.typed

# ../commands/typed.py

#: .. todo:: Add the ability to define prefixes for messages.
#: .. todo:: Add callback to the Node class. It could be called when a sub-command is required.

# =============================================================================
# >> IMPORTS
# =============================================================================
# Python Imports
#   Inspect
import inspect
from inspect import Parameter
import itertools
import textwrap

# Source.Python
#   Auth
from auth.manager import auth_manager
#   Core
from core import AutoUnload
from core import Tokenize
#   Commands
from commands import commands_logger
from commands import CommandReturn
from commands.server import server_command_manager
from commands.client import client_command_manager
from commands.say import say_command_manager
#   Filters
from filters.players import parse_filter
#   Messages
from messages import SayText2
from messages import TextMsg
from messages import HudDestination
#   Translations
from translations.strings import TranslationStrings


# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = ('PRIVATE_COMMAND_PREFIX',
           'SUPPORTED_KINDS',
           'ArgumentError',
           'ArgumentNumberMismatch',
           'CommandInfo',
           'CommandNode',
           'CommandParser',
           'filter_str',
           'InvalidArgumentValue',
           'Node',
           'Store',
           'SubCommandError',
           'SubCommandExpectedError',
           'SubCommandNotFound',
           'TypedClientCommand',
           'TypedSayCommand',
           'TypedServerCommand',
           'ValidationError'
           )


# =============================================================================
# >> CONSTANTS
# =============================================================================
SUPPORTED_KINDS = (
    # There is no syntax for this in Python, but a PEP. If it ever gets added,
    # this implementation will already support it (unless
    # <Parameter object>.name is empty).
    Parameter.POSITIONAL_ONLY,
    Parameter.POSITIONAL_OR_KEYWORD,
    Parameter.VAR_POSITIONAL,
)

PRIVATE_COMMAND_PREFIX = '/'


# =============================================================================
# >> GLOBAL VARIABLES
# =============================================================================
logger = commands_logger.typed


# =============================================================================
# >> EXCEPTIONS
# =============================================================================
[docs]class ValidationError(Exception):
[docs] def __init__(self, message='', language=None, **tokens): self.message = message self.language = language self.tokens = tokens
[docs]class ArgumentError(ValidationError): pass
[docs]class ArgumentNumberMismatch(ArgumentError): pass
[docs]class InvalidArgumentValue(ArgumentError): pass
[docs]class SubCommandError(ValidationError): pass
[docs]class SubCommandNotFound(SubCommandError): pass
[docs]class SubCommandExpectedError(SubCommandError): pass
# ============================================================================= # >> CLASSES # =============================================================================
[docs]class Node(object):
[docs] def __init__(self, commands, description): self.description = description self.commands = commands
@property def signature(self): """Return the signature of the node. :rtype: str """ raise NotImplementedError('Must be implemented by a sub class.')
[docs]class Store(Node, dict):
[docs] def __init__(self, commands=None, description=None): super().__init__(commands, description)
@property def signature(self): """Return the signature of the node. :rtype: str """ return ' '.join(self.commands) + ' <sub-command>' @property def help_text(self): """Return the help text of the node. :rtype: str """ wrapper = textwrap.TextWrapper( 40, subsequent_indent=' ', break_long_words=True) result = '' for node in sorted(self.values(), key=lambda node: node.signature): sig_lines = wrapper.wrap(node.signature) desc_lines = wrapper.wrap(node.description or '') for sig, desc in itertools.zip_longest( sig_lines, desc_lines, fillvalue=''): result += '\n ' + sig.ljust(40) + ' ' + desc return result
[docs]class CommandNode(Node):
[docs] def __init__(self, commands, params, callback, description, permission, fail_callback, requires_registration): """Initialize the object. .. seealso:: :meth:`CommandParser.add_command` """ super().__init__(commands, description) self.params = params self.callback = callback # Explicitly check against True, because <permission> can also be a # non-empty string if permission == True: permission = '.'.join(commands) self.permission = permission self.fail_callback = fail_callback self.requires_registration = requires_registration self.command_to_register = commands[0]
@property def signature(self): """Return the full signature of a command. :rtype: str """ result = ' '.join(self.commands) for param in self.params: result += ' ' + self._param_to_str(param) return result @staticmethod def _param_to_str(param): """Represent a parameter as a string. :rtype: str """ result = '{open_char}{arg_name}{type_name}{default}{close_char}' if (param.kind == param.VAR_POSITIONAL or param.default is not param.empty): open_char = '[' close_char = ']' else: open_char = '<' close_char = '>' if param.default is not param.empty: default = '=' + str(param.default) else: default = '' if param.annotation is not param.empty: type_name = ':' + param.annotation.__name__ else: type_name = '' if param.kind == param.VAR_POSITIONAL: arg_name = '*' + param.name else: arg_name = param.name return result.format(open_char=open_char, arg_name=arg_name, type_name=type_name, default=default, close_char=close_char)
# TODO: This base class is not quite correct.
[docs]class CommandParser(Store):
[docs] def add_command(self, commands, params, callback, description=None, permission=None, fail_callback=None): """Add a command to the parser. :param str/list/tuple commands: Command to register. :param iterable params: Parameters of the command. :param callable callback: The callback for the command. :param str description: Description of the command. :param str permission: Required permission to use the command. :param callable fail_callback: Callback that gets called if authorization failed. :rtype: CommandNode """ commands = self._validate_commands(commands) for param in params: if param.kind not in SUPPORTED_KINDS: raise TypeError( 'Unsupported argument type "{}" for argument.'.format( param.kind, param.name)) command = CommandNode(tuple(commands), tuple(params), callback, description, permission, fail_callback, commands[0] not in self) parsed_commands = [] store = self while commands: command_name = commands.pop(0).lower() parsed_commands.append(command_name) if command_name in store: store = store[command_name] if (isinstance(store, CommandNode) or (not commands and isinstance(store, Store))): # We can't support multiple callbacks, because they might # have different signatures. raise ValueError('Command already exists.') else: if commands: # We need to split these two lines to prevent recursive # dicts new_store = store[command_name] = Store( tuple(parsed_commands)) store = new_store else: store[command_name] = command return command
[docs] def remove_command(self, commands): """Remove a command. :param str/list/tuple: Command to remove. .. seealso:: :meth:`_remove_command` .. seealso:: :meth:`_validate_commands` """ return self._remove_command(self._validate_commands(commands))
def _remove_command(self, commands): """Remove a command. :param list commands: Command to remove. :raise ValueError: Raised if the node does not exist. :return: Return whether the base command needs to be unregistered. :rtype: bool """ store = self for command_name in commands[:-1]: try: store = store[command_name] except KeyError: raise ValueError('Command does not exist.') del store[commands[-1]] if len(commands) > 1: if store: return False return self._remove_command(commands[:-1]) return True
[docs] def get_node(self, commands): """Return a node. :param str/list/tuple: Node to seach. :raise ValueError: Raised if the node does not exist. :rtype: Node .. seealso:: :meth:`_validate_command` """ store = self for command_name in self._validate_commands(commands): try: store = store[command_name] except (TypeError, KeyError): # The TypeError happens if "store" is already a CommandNode raise ValueError('Node does not exist.') return store
[docs] def set_node_description(self, commands, description): """Set the description of a node. :param str/list/tuple: Node to seach. :raise ValueError: Raised if the node does not exist. """ self.get_node(commands).description = description
[docs] def get_command(self, commands): """Return a command. :param str/list/tuple: Command to search. :rtype: CommandNode .. seealso:: :meth:`get_node` """ command = self.get_node(commands) assert isinstance(command, CommandNode) return command
def _validate_commands(self, commands): """Validate a string, list or tuple of commands. :param str/list/tuple: The command name or command path to validate. :raise TypeError: Raised if ``commands`` is not a str, list or tuple. :raise ValueError: Raised if ``commands`` is an empty list or tuple. .. seealso:: :meth:`_validate_command` """ if isinstance(commands, str): self._validate_command(commands) commands = [commands] elif not isinstance(commands, (list, tuple)): raise TypeError('<commands> must be str, list or tuple.') elif not commands: raise ValueError('<commands> must not be empty.') else: for command_name in commands: self._validate_command(command_name) return list(commands) def _validate_command(self, command_name): """Validate a command name. :param str command_name: The command name to check. :raise ValueError: Raised if the command name is invalid. """ if not command_name: raise ValueError('Command name cannot be empty.') if ' ' in command_name: raise ValueError('Command cannot contain spaces.')
[docs] def clean_command(self, command, args): """Clean a command and its passed arguments. :param CommandNode command: :param iterable args: The arguments to clean. :rtype: list :raise InvalidArgumentValue: Raised if the value is invalid for an argument. :raise ArgumentNumberMismatch: Raised if too many/less arguments have been passed. """ result = [] params = list(command.params) param = None for arg in args: if param is None or param.kind is not param.VAR_POSITIONAL: try: param = params.pop(0) except IndexError: raise ArgumentNumberMismatch( 'Too many arguments:\n {}'.format(command.signature)) if param.annotation is not param.empty: try: arg = param.annotation(arg) except ValidationError: raise except: raise InvalidArgumentValue( '"{}" is an invalid value for "{}:{}".'.format( arg, param.name, param.annotation.__name__)) result.append(arg) for param in params: if param.kind is param.VAR_POSITIONAL: break if param.default is param.empty: raise ArgumentNumberMismatch( 'Not enough arguments:\n {}'.format(command.signature)) result.append(param.default) return result
[docs] def parse_command(self, command): """Parse a :class:`Command` object. Splits the command into the actual command and its arguments. :param Command command: A command to parse. :raise SubCommandNotFound: Raised if a sub command was not found. :raise SubCommandExpected: Raised if a sub command was expected, but more arguments have been passed. :rtype: tuple """ args = Tokenize(command.command_string) store = self while args and isinstance(store, Store): sub_command = args.pop(0).lower() try: store = store[sub_command] except KeyError: raise SubCommandNotFound( 'Sub command "{}" not found.'.format(sub_command)) if isinstance(store, Store): raise SubCommandExpectedError( 'A sub-command is required:{}'.format(store.help_text)) return (store, args)
[docs]class CommandInfo(object): """Stores command information for typed commands."""
[docs] def __init__(self, command, typed_command_cls, index=None, team_only=None): """Initializes the instance. :param Command command: The actual Command instance. :param _TypedCommand typed_command_cls: Command this instance belongs to. :param int index: The index of the player that issued the command. None, if it's a server command. :param bool team_only: Indicates whether the command was issued in team chat. None, if it's a server or client command. """ self.command = command self.typed_command_cls = typed_command_cls self.index = index self.team_only = team_only
[docs] def reply(self, msg, language=None, **tokens): """Reply to the command issuer. :param str/TranslationStrings msg: Message to send. :param str language: Language to be used. :param tokens: Translation tokens for message. """ self.typed_command_cls.send_message(self, msg, language, **tokens)
[docs] def is_private_command(self): """Return ``True`` if it's a private command. :rtype: bool """ return self.command[0].startswith(PRIVATE_COMMAND_PREFIX)
@property def auto_command_return(self): """Determine the probably most desired ``CommandReturn`` value. For server commands it will always return ``CommandReturn.CONTINUE``. Client commands will always use ``CommandReturn.BLOCK``. For say commands it's determined by checking the prefix. If the command starts with a slash (``/``) it's handled as a private command. Thus, ``CommandReturn.BLOCK`` is returned. Otherwise ``CommandReturn.CONTINUE`` is used. :rtype: CommandReturn """ return self.typed_command_cls.get_auto_command_return(self)
# We can't integrate this into SayCommand, ServerCommand and ClientCommand, # because multiple callbacks are not supported by this system (because of the # possibility of different function signatures). But multiple callbacks are # important e.g. for ClientCommand, because that can actually hook client # commands and multiple plugins might want to hook a client command. class _TypedCommand(AutoUnload): """Decorator class to create typed commands.""" def __init__(self, commands, permission=None, fail_callback=None): """Register a typed command callback. :param str/list/tuple commands: (Sub-) command to register. :param str/bool permission: A permission that is required to execute the command. If True, the permission string will be generated from the given command. :param callable fail_callback: Called when the executer does not have the required permission. """ self.commands = commands self.permission = permission self.fail_callback = fail_callback self.command = None def __call__(self, callback): """Finish registering a typed command callback. :param callable callback: A callback that get called when the command has been issued. :raise ValueError: Raised if the callback does not accept at least one argument (command info). :return: Return the passed callback. :rtype: callable """ params = tuple(inspect.signature(callback).parameters.values()) if not params: raise ValueError( 'Callback must at least accept 1 argument (command_info).') self.command = self.parser.add_command( self.commands, params[1:], callback, inspect.getdoc(callback), self.permission, self.fail_callback) if self.command.requires_registration: self.manager.register_commands( self.command.command_to_register, self.on_command) return callback def _unload_instance(self): try: if self.parser.remove_command(self.commands): self.manager.unregister_commands( self.command.command_to_register, self.on_command) except ValueError: pass @classmethod def on_command(cls, command, *args): """Called when a (base) command has been executed. Parse the command, clean its arguments and execute the callback. """ info = CommandInfo(command, cls, *args) try: command_node, args = cls.parser.parse_command(info.command) result = cls.on_clean_command(info, command_node, args) except ValidationError as e: info.reply(e.message, e.language, **e.tokens) else: if result is None: return info.auto_command_return return result return CommandReturn.CONTINUE @classmethod def on_clean_command(cls, command_info, command_node, args): """Called when the arguments of the parsed command should be cleaned. :rtype: CommandReturn """ cleaned_args = cls.parser.clean_command(command_node, args) return command_node.callback(command_info, *cleaned_args) @property def parser(self): """Return the parser to use. :rtype: CommandParser """ raise NotImplementedError('Needs to be implemented by a sub class.') @property def manager(self): """Return the manager that registers the commands. :rtype: _BaseCommandManager """ raise NotImplementedError('Needs to be implemented by a sub class.') @staticmethod def send_message(command_info, message, language=None, **tokens): """Send a message.""" raise NotImplementedError('Needs to be implemented by a sub class.') @classmethod def get_auto_command_return(cls, info): """Return the most desired ``CommandReturn`` value. :rtype: CommandReturn """ raise NotImplementedError('Needs to be implemented by a sub class.')
[docs]class TypedServerCommand(_TypedCommand): """Decorator class to create typed server commands.""" parser = CommandParser() manager = server_command_manager @staticmethod
[docs] def send_message(command_info, message, language=None, **tokens): # Translate the message if it's a :class:`TranslationStrings` object. if isinstance(message, TranslationStrings): message = message.get_string(language, **tokens) logger.log_message(message)
@classmethod
[docs] def get_auto_command_return(cls, info): return CommandReturn.CONTINUE
class _TypedPlayerCommand(_TypedCommand): """Decorator class to create typed player based commands.""" @classmethod def on_clean_command(cls, command_info, command_node, args): if (command_node.permission is None or auth_manager.is_player_authorized( command_info.index, command_node.permission)): return super().on_clean_command(command_info, command_node, args) return cls.handle_fail_callback(command_info, command_node, args) @classmethod def handle_fail_callback(cls, command_info, command_node, args): if command_node.fail_callback is not None: return command_node.fail_callback(command_info, args) # TODO: Send "Required permission: <permission>" or write it to the # logs? cls.send_message( command_info, 'You are not authorized to use this command.\n' + 'Required permission: {}'.format(command_node.permission)) return CommandReturn.CONTINUE
[docs]class TypedClientCommand(_TypedPlayerCommand): """Decorator class to create typed client commands.""" parser = CommandParser() manager = client_command_manager @staticmethod
[docs] def send_message(command_info, message, language=None, **tokens): TextMsg(message, HudDestination.CONSOLE).send(command_info.index, **tokens)
@classmethod
[docs] def get_auto_command_return(cls, info): return CommandReturn.BLOCK
[docs]class TypedSayCommand(_TypedPlayerCommand): """Decorator class to create typed say commands.""" parser = CommandParser() manager = say_command_manager @staticmethod
[docs] def send_message(command_info, message, language=None, **tokens): SayText2(message).send(command_info.index, **tokens)
@classmethod
[docs] def get_auto_command_return(cls, info): if info.is_private_command(): return CommandReturn.BLOCK return CommandReturn.CONTINUE
# ============================================================================= # >> COMMAND ANNOTATIONS # =============================================================================
[docs]def filter_str(expr): """.. seealso:: :func:`filters.players.parse_filter`""" # A simple wrapper for parse_filter for a better looking command signature return parse_filter(expr)