# ../core/__init__.py
"""Provides core functionality that doesn't fit into any other package."""
# =============================================================================
# >> IMPORTS
# =============================================================================
# Python Imports
# Codecs
import codecs
# Collections
from collections import defaultdict
# Contextlib
from contextlib import contextmanager
# Hashlib
import hashlib
# Inspect
from inspect import getmodule
from inspect import getmodulename
from inspect import currentframe
# OS
from os import sep
# Path
from path import Path
# Platform
from platform import system
# RE
from re import compile as re_compile
from re import finditer
# Sys
import sys
# Urllib
from urllib.request import urlopen
# Weakref
from weakref import WeakValueDictionary
# Site-Packages Imports
# ConfigObj
from configobj import ConfigObj
# Source.Python Imports
# Paths
from paths import GAME_PATH
from paths import CFG_PATH
from paths import PLUGIN_PATH
# =============================================================================
# >> FORWARD IMPORTS
# =============================================================================
# Source.Python Imports
# Core
from _core import BoostPythonClass
from _core import console_message
from _core import get_interface
from _core import get_core_modules
from _core import OutputReturn
from _core import SOURCE_ENGINE
from _core import SOURCE_ENGINE_BRANCH
# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = ('AutoUnload',
'BoostPythonClass',
'ConfigFile',
'GameConfigObj',
'WeakAutoUnload',
'GAME_NAME',
'OutputReturn',
'PLATFORM',
'SOURCE_ENGINE',
'SOURCE_ENGINE_BRANCH',
'Tokenize',
'check_info_output',
'console_message',
'create_checksum',
'echo_console',
'get_core_modules',
'get_interface',
'get_public_ip',
'ignore_unicode_errors',
'server_output',
)
# =============================================================================
# >> GLOBAL VARIABLES
# =============================================================================
# Get the specific game for the server
GAME_NAME = GAME_PATH.namebase
# Get the platform the server is on
PLATFORM = system().lower()
# =============================================================================
# >> CLASSES
# =============================================================================
[docs]class AutoUnload:
"""Class used to auto unload specific instances.
Each inheriting class must implement an _unload_instance method.
"""
# Create a dictionary to store AutoUnload objects in
_module_instances = defaultdict(list)
def __new__(cls, *args, **kwargs):
"""Overwrite __new__ to store the calling module."""
# Get the class instance
self = super().__new__(cls)
# Get the calling frame
frame = currentframe().f_back
# Get the calling path
path = frame.f_code.co_filename
# Don't keep hostage instances that will never be unloaded
while not path.startswith(PLUGIN_PATH):
frame = frame.f_back
if frame is None:
return self
path = frame.f_code.co_filename
if path.startswith('<frozen'):
return self
# Resolve the calling module name
try:
name = frame.f_globals['__name__']
except KeyError:
try:
name = getmodule(frame).__name__
except AttributeError:
name = getmodulename(path)
# Call class-specific logic for adding the instance.
if name is not None:
self._add_instance(name)
# Return the instance
return self
def _add_instance(self, caller):
"""Add the instance to self._module_instances."""
self._module_instances[caller].append(self)
def _unload_instance(self):
"""Base _unload_instance implementation."""
[docs]class WeakAutoUnload(AutoUnload):
"""Subclass of AutoUnload used to store weak references to instances."""
# Create a dictionary to store WeakAutoUnload objects in
_module_instances = defaultdict(WeakValueDictionary)
def _add_instance(self, caller):
"""Add the instance to self._module_instances."""
self._module_instances[caller][id(self)] = self
[docs]class GameConfigObj(ConfigObj):
"""Class used to parse specific game data."""
[docs] def __init__(self, infile, *args, **kwargs):
"""Helper class that merges the given file with engine/game files."""
# Get the file directory/name...
path, name = Path(infile).splitpath()
# Call ConfigObj's __init__ method...
super().__init__(infile, *args, **kwargs)
# Move the path to the current engine sub-directory...
path = path / SOURCE_ENGINE
# Parse and merge the specific engine file...
self.merge(ConfigObj(path / name, *args, **kwargs))
# Finally, parse the specific game file...
self.merge(ConfigObj(path / GAME_NAME / name, *args, **kwargs))
[docs]class Tokenize(list):
"""Parses the arguments from the given string."""
_pattern = re_compile(r'"[^"]*"|[^ \t]+')
[docs] def __init__(self, string, comment_prefix=None):
"""Splits the arguments from the given string."""
# Initialize the list
super().__init__()
# Store the given string as is
self.string = string
# Loop through all tokens
for match in finditer(self._pattern, string):
# Get the current match as a string
arg = match.group()
# Strip end line comment
if comment_prefix is not None and arg.startswith(comment_prefix):
self.string = self.string[:match.start()]
break
# Add the current argument to the list
self.append(arg.strip('"'))
def __str__(self):
"""Returns the original string (without end-line comment)."""
return self.string
def __hash__(self):
"""Hashes the original string."""
return hash(self.string)
[docs]class ConfigFile(list):
"""Class used to parse a configuration file."""
[docs] def __init__(
self, path, encoding='utf-8', comment_prefix='//', as_strings=False):
"""Parses the given configuation file path.
:param Path path:
The path of the file to parse.
:param str encoding:
The encoding to use when opening the file.
:param str comment_prefix:
The prefix of end line comments.
:param bool as_strings:
Whether the parsed lines should be stored as strings rather than
argument lists.
"""
# If the given path doesn't exist, search for it in the cfg directory
if not path.isfile():
path = CFG_PATH.joinpath(path)
# If no file was found, return an empty list
if not path.isfile():
return
# Import this here to fix cyclic imports
from translations.strings import LangStrings
# Open the given file and parse its content
with open(path, 'r', encoding=encoding) as f:
# Loop through all lines
for line in f.read().splitlines():
# Parse the argument from the current line
args = Tokenize(
LangStrings._replace_escaped_sequences(line),
comment_prefix)
# Skip empty/commented lines
if not args:
continue
# Add the current line to the list
self.append(args if not as_strings else str(args))
# =============================================================================
# >> FUNCTIONS
# =============================================================================
[docs]def echo_console(text):
"""Echo a message to the server's console.
.. note::
Unlike ``console_message``, this function automatically adds a newline
at the end of the message.
:param str text:
Message to print to the console.
"""
console_message(text + '\n')
@contextmanager
[docs]def ignore_unicode_errors(errors='ignore'):
"""Overwrite the ``strict`` codecs error handler temporarily.
This is useful e.g. if the engine truncates a string, which results in a
string that contains a splitted multi-byte character at the end of the
string.
:param str errors:
Error handler that will be looked up via :func:`codecs.lookup_error`.
:raise LookupError:
Raised if the error handler was not found.
Example:
.. code:: python
import memory
# Allocate four bytes to create an erroneous string
ptr = memory.alloc(4)
# Write data to the memory that will usually result in a
# UnicodeDecodeError
ptr.set_uchar(ord('a'), 0)
ptr.set_uchar(ord('b'), 1)
ptr.set_uchar(226, 2) # Add the invalid byte
ptr.set_uchar(0, 3) # Indicate the end of the string
with ignore_unicode_errors():
# Read the data as a string. Now, it will only print 'ab', because
# the invalid byte has been removed/ignored.
print(ptr.get_string_array())
"""
old_handler = codecs.lookup_error('strict')
codecs.register_error('strict', codecs.lookup_error(errors))
try:
yield
finally:
codecs.register_error('strict', old_handler)
[docs]def get_public_ip():
"""Return the server's public IPv4.
:rtype: str
.. note::
This functions makes a call to ``http://api.ipify.org`` to retrieve the public IP.
"""
return urlopen('http://api.ipify.org/').read().decode()
@contextmanager
[docs]def server_output(action=OutputReturn.CONTINUE):
"""Gather all server output sent during the execution of the with-statement.
:param OutputReturn action:
Determines what happens with the output.
:rtype: list
:return:
A list that will be filled with a tuple for every line that is being
logged. The tuple contains the severity and the message.
Example:
.. code:: python
from cvars import cvar
from core import server_output
from core import OutputReturn
status = cvar.find_command('status')
with server_output(OutputReturn.BLOCK) as output:
status()
# Print everything that was logged by the 'status' command
print(output)
Output:
.. code:: python
[(_core.MessageSeverity.MESSAGE, 'hostname: Counter-Strike: Global Offensive\\n'),
(_core.MessageSeverity.MESSAGE, 'version : 1.35.8.4/13584 513/6771 secure [A:1:2435270659:8640] \\n'),
(_core.MessageSeverity.MESSAGE, 'udp/ip : 192.168.178.60:27015 (public ip: 46.83.158.27)\\n'),
(_core.MessageSeverity.MESSAGE, 'os : Windows\\n'),
(_core.MessageSeverity.MESSAGE, 'type : community dedicated\\n'),
(_core.MessageSeverity.MESSAGE, 'players : 0 humans, 0 bots (20/0 max) (hibernating)\\n\\n'),
(_core.MessageSeverity.MESSAGE, '# userid name uniqueid connected ping loss state rate'),
(_core.MessageSeverity.MESSAGE, ' adr'),
(_core.MessageSeverity.MESSAGE, '\\n'),
(_core.MessageSeverity.MESSAGE, '#end\\n')]
"""
# Import this here to fix a cyclic import
from listeners import OnServerOutput
msg_buffer = []
def intercepter(severity, msg):
msg_buffer.append((severity, msg))
return action
OnServerOutput.manager.register_listener(intercepter)
try:
yield msg_buffer
finally:
OnServerOutput.manager.unregister_listener(intercepter)
[docs]def create_checksum(data, ignore_wchars=True):
"""Create an MD5 checksum for the given string.
:param str data:
The string for which a checksum should be created.
:param bool ignore_wchars:
If ``True`` whitespace characters are ignored.
:rtype: str
"""
if ignore_wchars:
data = ''.join(data.split())
return hashlib.new('md5', bytes(data, encoding='utf-8')).hexdigest()
[docs]def check_info_output(output):
"""Return whether the output of ``sp info`` has been modified.
:param str output:
The output of ``sp info``.
:raise ValueError:
Raised if the checksum was not found in the output.
:return:
``True`` if the output has been modified.
:rtype: bool
"""
checksum = None
lines = output.strip().split('\n')
# Search the checksum entry
while lines:
line = lines.pop(0)
if line.startswith('Checksum'):
checksum = line.split(':', 1)[1].strip()
break
if checksum is None:
raise ValueError('Checksum not found.')
# Ignore last line if it's the separator
if lines[-1].startswith('-'):
lines.pop()
return create_checksum(''.join(lines)) != checksum