Source code for messages.hooks

# ../messages/hooks.py

"""Provides user message hooking functionality."""

# TODO:
# - Implement more user messages
# - Use these message implementations for the UserMessageCreator subclasses.

# =============================================================================
# >> IMPORTS
# =============================================================================
# Python
#   Collections
from collections import defaultdict

# Source.Python
#   Core
from core import AutoUnload
#   Engines
from engines.server import engine_server
#   Filters
from filters.recipients import BaseRecipientFilter
from filters.recipients import RecipientFilter
#   Bitbuffers
from bitbuffers import BitBufferWrite
from bitbuffers import BitBufferRead
#   Listeners
from listeners import ListenerManager
#   Memory
from memory import make_object
from memory import get_object_pointer
from memory import get_size
from memory import get_virtual_function
from memory.hooks import PreHook
from memory.hooks import PostHook
#   Messages
from messages import UserMessage
from messages import get_message_index
from messages import get_message_name
from messages.impl import get_user_message_impl

if UserMessage.is_protobuf():
    from _messages import ProtobufMessage


# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = (
    'HookUserMessageBase',
    'HookBitBufferUserMessage',
    'HookProtobufUserMessage',
    'HookUserMessage',
)


# =============================================================================
# >> GLOBAL VARIABLES
# =============================================================================
_user_message_data = None
_recipients = RecipientFilter()


# =============================================================================
# >> CLASSES
# =============================================================================
[docs]class HookUserMessageBase(AutoUnload): """Base decorator for user message hooks."""
[docs] def __init__(self, user_message): """Create a new user message hook. :param int/str user_message: The user message index or name to hook. :raise TypeError: Raised if ``user_message`` is not and int or str. :raise ValueError: Raised if the user message does not exist. """ if isinstance(user_message, int): index = user_message elif isinstance(user_message, str): index = get_message_index(user_message) else: raise TypeError( 'Invalid type for "user_message". int or str required.') self.message_index = index self.message_name = get_message_name(index) self.callback = None # Verify that it's a valid index if self.message_name is None: raise ValueError(f'Invalid user message: {user_message}')
def __call__(self, callback): """Finalize the hook registration by registering the callback. :param object callback: A callable object that will be called when a user message is created. :return: The callback that has been passed. """ if not callable(callback): raise ValueError('Callback must be callable.') self.callback = callback self.hooks[self.message_index].register_listener(callback) return self.callback def _unload_instance(self): """Unregister the user message hook.""" if self.callback is None: return self.hooks[self.message_index].unregister_listener(self.callback) @property def hooks(self): """Return all hooks for a user message. :rtype: ListenerManager """ raise NotImplementedError('Must be implemented by a subclass.')
[docs]class HookBitBufferUserMessage(HookUserMessageBase): """Decorator to register a raw user message hook for bitbuffer messages.""" hooks = defaultdict(ListenerManager)
[docs]class HookProtobufUserMessage(HookUserMessageBase): """Decorator to register a raw user message hook for protobuf messages.""" hooks = defaultdict(ListenerManager)
[docs]class HookUserMessage(HookUserMessageBase): """Decorator to register a convenient user message hook.""" hooks = defaultdict(ListenerManager)
[docs] def __init__(self, user_message): """Create a new user message hook. :raise NotImplementedError: Raised if the user message has not been implemented yet in Source.Python. .. seealso:: :meth:`HookUserMessageBase.__init__` """ super().__init__(user_message) # Verify that the user message is supported/implemented. This will # raise a NotImplementedError if it isn't. self.impl = get_user_message_impl(self.message_index)
# ============================================================================= # >> HOOKS # ============================================================================= if UserMessage.is_protobuf(): @PreHook(get_virtual_function(engine_server, 'SendUserMessage')) def _pre_send_user_message(args): message_index = args[2] user_message_hooks = HookUserMessage.hooks[message_index] protobuf_user_message_hooks = HookProtobufUserMessage.hooks[message_index] # No need to do anything behind this if no listener is registered if not user_message_hooks and not protobuf_user_message_hooks: return try: # Replace original recipients filter tmp_recipients = make_object(BaseRecipientFilter, args[1]) except RuntimeError: # Patch for issue #314 tmp_recipients = RecipientFilter.from_abstract_pointer(args[1]) _recipients.update(*tuple(tmp_recipients), clear=True) args[1] = _recipients try: buffer = make_object(ProtobufMessage, args[3]) wrapped_from_abstract = False except RuntimeError: # Patch for issue #390 - UserMessage was created by another plugin. buffer = ProtobufMessage.from_abstract_pointer(args[3]) wrapped_from_abstract = True protobuf_user_message_hooks.notify(_recipients, buffer) # No need to do anything behind this if no listener is registered if user_message_hooks: try: impl = get_user_message_impl(message_index) except NotImplementedError: impl = None if impl is not None: data = impl.read(buffer) user_message_hooks.notify(_recipients, data) # Update buffer if data has been changed if data.has_been_changed(): buffer.clear() impl.write(buffer, data) # If we wrapped the buffer from an abstract pointer, make sure to # apply any changes that may have been made back into the original. if wrapped_from_abstract: buffer.parse_to_abstract_pointer(args[3]) else: @PreHook(get_virtual_function(engine_server, 'UserMessageBegin')) def _pre_user_message_begin(args): try: # Replace original recipients filter tmp_recipients = make_object(BaseRecipientFilter, args[1]) except RuntimeError: # Patch for issue #314 tmp_recipients = RecipientFilter.from_abstract_pointer(args[1]) _recipients.update(*tuple(tmp_recipients), clear=True) args[1] = _recipients @PostHook(get_virtual_function(engine_server, 'UserMessageBegin')) def _post_user_message_begin(args, return_value): global _user_message_data _user_message_data = (args[2], return_value) @PreHook(get_virtual_function(engine_server, 'MessageEnd')) def _pre_message_end(args): # This happens when we initialize our hooks, while a user message is # currently being created if _user_message_data is None: return message_index, buffer_write_ptr = _user_message_data # Retrieve the ListenerManager instances user_message_hooks = HookUserMessage.hooks[message_index] bitbuffer_user_message_hooks = HookBitBufferUserMessage.hooks[message_index] # No need to do anything behind this if no listener is registered if not user_message_hooks and not bitbuffer_user_message_hooks: return buffer_write = make_object(BitBufferWrite, buffer_write_ptr) buffer_read = BitBufferRead(buffer_write, False) org_current_bit = buffer_write.current_bit # For bitbuffers we need to make sure every callback starts reading and # writing from the very first bit. for callback in bitbuffer_user_message_hooks: buffer_read.seek_to_bit(0) buffer_write.seek_to_bit(0) callback(_recipients, buffer_read, buffer_write) # If none of the above callbacks wrote to the buffer, we need to restore # the current_bit to the original value. if buffer_write.current_bit == 0: buffer_write.seek_to_bit(org_current_bit) # No need to do anything behind this if no listener is registered if not user_message_hooks: return try: impl = get_user_message_impl(message_index) except NotImplementedError: return buffer_read.seek_to_bit(0) data = impl.read(buffer_read) user_message_hooks.notify(_recipients, data) # Update buffer if data has been changed if data.has_been_changed(): buffer_write.seek_to_bit(0) impl.write(buffer_write, data)