Source code for listeners.tick

# ../listeners/tick.py

"""Provides tick listener based functionality."""

# =============================================================================
# >> IMPORTS
# =============================================================================
# Python
import bisect
import math
import time

from enum import IntEnum
from threading import Thread
from warnings import warn

# Source.Python
from core import AutoUnload
from core import WeakAutoUnload
from hooks.exceptions import except_hooks
from listeners import (
    listeners_logger, on_tick_listener_manager, OnLevelEnd,
)


# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = (
    'Delay',
    'GameThread',
    'Repeat',
    'RepeatStatus',
)


# =============================================================================
# >> GLOBAL VARIABLES
# =============================================================================
# Get the sp.listeners.tick logger
listeners_tick_logger = listeners_logger.tick


# =============================================================================
# >> THREAD WORKAROUND
# =============================================================================
[docs]class GameThread(WeakAutoUnload, Thread): """A subclass of :class:`threading.Thread` that throws a warning if the plugin that created the thread has been unloaded while the thread is still running. """ def _add_instance(self, caller): super()._add_instance(caller) self._caller = caller def _unload_instance(self): if self.is_alive(): warn( f'Thread "{self.name}" ({self.ident}) from "{self._caller}" ' f'is running even though its plugin has been unloaded!')
# ============================================================================= # >> DELAY CLASSES # ============================================================================= class _DelayManager(list): """A class that is responsible for executing delays.""" def __init__(self): super().__init__() on_tick_listener_manager.register_listener(self._tick) def _tick(self): """Internal tick listener.""" current_time = time.time() while self and self[0].exec_time <= current_time: try: self.pop(0).execute() except: except_hooks.print_exception() def add(self, delay): """Add a delay to the list. :param Delay delay: The delay to add. """ bisect.insort_right(self, delay) _delay_manager = _DelayManager()
[docs]class Delay(WeakAutoUnload): """Execute a callback after a given delay."""
[docs] def __init__( self, delay, callback, args=(), kwargs=None, cancel_on_level_end=False ): """Initialize the delay. :param float delay: The delay in seconds. :param callback: A callable object that should be called after the delay expired. :param tuple args: Arguments that should be passed to the callback. :param dict kwargs: Keyword arguments that should be passed to the callback. :param bool cancel_on_level_end: Whether or not to cancel the delay at the end of the map. :raise ValueError: Raised if the given callback is not callable. """ if not callable(callback): raise ValueError('Given callback is not callable.') #: Delay in seconds. self.delay = delay self._start_time = time.time() #: Time when the delay will be executed. self.exec_time = self._start_time + delay #: Callback to call when the delay expired. self.callback = callback #: Arguments to pass to the callback. self.args = args #: Keyword arguments to pass to the callback. self.kwargs = kwargs if kwargs is not None else dict() #: Whether or not to cancel the delay at the end of the map. self.cancel_on_level_end = cancel_on_level_end _delay_manager.add(self)
def __lt__(self, other): """Return True if this :attr:`exec_time` is less than the other's. :rtype: bool """ return self.exec_time < other.exec_time def __call__(self): """Cancel the delay and immediately call the callback. :return: The result of :meth:`execute`. """ self.cancel() return self.execute()
[docs] def execute(self): """Call the callback. :return: The result of :attr:`callback`. """ return self.callback(*self.args, **self.kwargs)
[docs] def cancel(self): """Cancel the delay. :raise ValueError: Raised if the delay is not running. """ _delay_manager.remove(self)
@property def running(self): """Return True if the delay running. :rtype: bool """ return self in _delay_manager @property def time_remaining(self): """Return the remaining time (in seconds) until the Delay ends. :rtype: float """ if not self.running: # TODO: what should we return here, or should we raise an error? return None return self.exec_time - time.time() @property def time_elapsed(self): """Return the amount of time (in seconds) since the Delay started. :rtype: float """ if not self.running: # TODO: what should we return here, or should we raise an error? return None return time.time() - self._start_time def _unload_instance(self): try: self.cancel() except ValueError: pass
# ============================================================================= # >> REPEAT CLASSES # =============================================================================
[docs]class RepeatStatus(IntEnum): """Class used to store RepeatStatus values.""" STOPPED = 1 RUNNING = 2 PAUSED = 3
[docs]class Repeat(AutoUnload): """Class used to create and call repeats."""
[docs] def __init__( self, callback, args=(), kwargs=None, cancel_on_level_end=False ): """Store all instance attributes. :param callback: A callable object that should be called at the end of each loop. :param tuple args: Arguments that should be passed to the callback. :param dict kwargs: Keyword arguments that should be passed to the callback. :param bool cancel_on_level_end: Whether or not to cancel the repeat at the end of the map. :raise ValueError: Raised if the given callback is not callable. """ if not callable(callback): raise ValueError('Given callback is not callable.') # Store the base attributes self.callback = callback self.args = args self.kwargs = kwargs if kwargs is not None else dict() self.cancel_on_level_end = cancel_on_level_end # Log the __init__ message listeners_tick_logger.log_debug( 'Repeat.__init__: <{self.callback}> <{self.args}>' ' <{self.kwargs}>'.format( self=self ) ) # Set up private attributes self._interval = 0 self._original_loops = math.inf self._loops_elapsed = 0 self._adjusted_loops = 0 self._status = RepeatStatus.STOPPED self._delay = None self._loop_time_for_pause = None self._original_start_time = None
@property def interval(self): """Return the interval in which the callback will be called. :rtype: int """ return self._interval @property def adjusted_loops(self): """Return the number of loops that have been adjusted. :rtype: int """ return self._adjusted_loops @property def loops_remaining(self): """Return the remaining number of loops in the repeat. :rtype: int """ return self.total_loops - self.loops_elapsed @property def loops_elapsed(self): """Return the current number of loops made in the repeat. :rtype: int """ return self._loops_elapsed @property def total_loops(self): """Return the total number of loops to be made. :rtype: int """ return self._original_loops + self._adjusted_loops @property def original_loops(self): """Return the number of loops the repeat has been started with. :rtype: int """ return self._original_loops @property def total_time_remaining(self): """Return the remaining time till the end of the repeat. :rtype: float """ if self.delay_time_remaining is None: return None return ( self.loops_remaining * self.interval + self.delay_time_remaining ) @property def total_time_elapsed(self): """Return the elapsed time since the repeat started. :rtype: float """ return time.time() - self._original_start_time @property def total_time(self): """Return the total time it will take to complete the repeat. :rtype: float """ return self.total_loops * self.interval @property def delay_time_remaining(self): """Return the time remaining in the current loop. :rtype: float """ return self._delay.time_remaining @property def delay_time_elapsed(self): """Return the time elapsed in the current loop. :rtype: float """ return self._delay.time_elapsed @property def status(self): """Return the status of the repeat. :rtype: RepeatStatus """ return self._status
[docs] def start(self, interval, limit=math.inf, execute_on_start=False): """Start the repeat loop. :param float interval: The time (in seconds) for each loop. :param int limit: The maximum number of times to loop. If :data:`math.inf` is passed, there is no limit, and the Repeat will loop indefinitely. :param bool execute_on_start: Whether to execute the callback when the Repeat is started. Note that this does not affect the 'limit' as the number of loops will remain the same. """ # Log the start message listeners_tick_logger.log_debug( 'Repeat.start: <{interval}> <{limit}>'.format( interval=interval, limit=limit ) ) # Is the repeat already running? if self._status is RepeatStatus.RUNNING: # Log the status listeners_tick_logger.log_debug( 'Repeat.start - RepeatStatus.RUNNING' ) # Do not start the repeat return # Log starting the repeat listeners_tick_logger.log_debug( 'Repeat.start - !RepeatStatus.RUNNING - Starting Repeat' ) self._status = RepeatStatus.RUNNING self._interval = interval self._original_loops = limit self._loops_elapsed = 0 self._adjusted_loops = 0 self._original_start_time = time.time() # Start the delay self._delay = Delay( self.interval, self._execute, cancel_on_level_end=self.cancel_on_level_end ) # Call the callback if set to execute on start if execute_on_start: self.callback(*self.args, **self.kwargs)
[docs] def stop(self): """Stop the repeat loop.""" # Log the stop message listeners_tick_logger.log_debug('Repeat.stop') # Is the repeat running? if self._status is not RepeatStatus.RUNNING: # Log the status listeners_tick_logger.log_debug( 'Repeat.stop - !RepeatStatus.RUNNING' ) # No need to stop it return # Log stopping the repeat listeners_tick_logger.log_debug( 'Repeat.stop - RepeatStatus.RUNNING - Stopping Repeat' ) # Set the status to stopped self._status = RepeatStatus.STOPPED # Cancel the delay self._delay.cancel()
[docs] def restart(self): """Restart the repeat.""" # Log restarting the repeat listeners_tick_logger.log_debug('Repeat.restart') # Stop the repeat self.stop() # Start the repeat self.start(self.interval, self.total_loops)
[docs] def pause(self): """Pause the repeat. Pausing allows the repeat to be resumed. """ # Log the pause message listeners_tick_logger.log_debug('Repeat.pause') # Is the repeat running? if self._status is not RepeatStatus.RUNNING: # Log the status listeners_tick_logger.log_debug( 'Repeat.pause - !RepeatStatus.RUNNING' ) # No need to pause return # Log pausing the repeat listeners_tick_logger.log_debug( 'Repeat.pause - RepeatStatus.RUNNING - Pausing Repeat' ) # Set the status to paused self._status = RepeatStatus.PAUSED # Set the remaining time in the current loop self._loop_time_for_pause = self._delay.time_remaining # Cancel the delay self._delay.cancel()
[docs] def resume(self): """Resume the repeat. Can only resume if in paused status. """ # Log the resume message listeners_tick_logger.log_debug('Repeat.resume') # Is the repeat paused? if self._status is not RepeatStatus.PAUSED: # Log the status listeners_tick_logger.log_debug( 'Repeat.resume - !RepeatStatus.PAUSED' ) # Do not resume return # Log resuming the repeat listeners_tick_logger.log_debug( 'Repeat.resume - RepeatStatus.PAUSED - Resuming Repeat' ) # Set the status to running self._status = RepeatStatus.RUNNING # Start the delay self._delay = Delay( self._loop_time_for_pause, self._execute, cancel_on_level_end=self.cancel_on_level_end )
[docs] def extend(self, adjustment): """Add to the number of loops to be made. :param int adjustment: The number of loops to be added to the limit. :raise ValueError: Raised if given adjustment is not a positive integer. """ listeners_tick_logger.log_debug('Repeat.extend') # Is there no limit for this repeat? if self.total_loops == math.inf: listeners_tick_logger.log_debug( 'Unable to extend, Repeat instance has no limit.' ) return # Was a positive integer given? if not isinstance(adjustment, int) or adjustment < 1: raise ValueError('Adjusted value must be a positive integer') # Add to the adjusted number self._adjusted_loops += adjustment
[docs] def reduce(self, adjustment): """Reduce the number of loops to be made. :param int adjustment: The number of loops to be removed from the limit. :raises ValueError: Raised if given adjustment is not a positive integer. """ listeners_tick_logger.log_debug('Repeat.reduce') # Is there no limit for this repeat? if self.total_loops == math.inf: listeners_tick_logger.log_debug( 'Unable to reduce, Repeat instance has no limit.' ) return # Was a positive integer given? if not isinstance(adjustment, int) or adjustment < 1: raise ValueError('Adjusted value must be a positive integer') # Subtract from the adjusted number self._adjusted_loops -= adjustment # Should the repeat be stopped? if ( self.loops_remaining <= 0 and self.status is RepeatStatus.RUNNING ): listeners_tick_logger.log_debug( 'Repeat.reduce - Reduce caused repeat to stop' ) self.stop()
def _execute(self): """Execute the repeat's callback with its arguments and keywords.""" listeners_tick_logger.log_debug('Repeat._execute') self._loops_elapsed += 1 # Are any more loops to be made? if self.loops_remaining > 0: listeners_tick_logger.log_debug( 'Repeat._execute - Remaining - {remaining}'.format( remaining=self.loops_remaining ) ) # Call the delay again self._delay = Delay( self.interval, self._execute, cancel_on_level_end=self.cancel_on_level_end ) else: listeners_tick_logger.log_debug( 'Repeat._execute - Stopping the loop' ) # Set the status to stopped self._status = RepeatStatus.STOPPED # Call the repeat's callback for this loop self.callback(*self.args, **self.kwargs) def _unload_instance(self): """Stop the repeat with being unloaded.""" self.stop()
# ============================================================================= # >> HELPER FUNCTIONS # ============================================================================= @OnLevelEnd def _cancel_delays_on_level_end(): for delay in list(_delay_manager): if not delay.cancel_on_level_end: continue callback = delay.callback if ( callback.__name__ == '_execute' and hasattr(callback, '__self__') and isinstance(callback.__self__, Repeat) ): callback.__self__.stop() else: delay.cancel()