Source code for core.update
# ../core/update.py
"""Provides functions to update Source.Python and its data files."""
# =============================================================================
# >> IMPORTS
# =============================================================================
# Python Imports
import json
import time
from zipfile import ZipFile
from urllib.request import urlopen
# Source.Python Imports
# Core
from core import PLATFORM
from core import SOURCE_ENGINE_BRANCH
from core.logger import core_logger
# Paths
from paths import ADDONS_PATH
from paths import GAME_PATH
from paths import UPDATE_PATH
from paths import UPDATE_DATA_PATH
from paths import UPDATE_SP_DATA_PATH
from paths import DATA_PATH
from paths import SP_DATA_PATH
# KeyValues
from keyvalues import KeyValues
# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = (
'ARTIFACTS_URL',
'BASE_DOWNLOAD_URL',
'CHECKSUM_URL',
'DATA_URL',
'DATA_ZIP_FILE',
'download_file',
'get_build_artifacts',
'get_download_url',
'get_latest_data_checksum',
'is_new_data_available',
'update_in_progress',
'update_data',
)
# =============================================================================
# >> GLOBAL VARIABLES
# =============================================================================
# Don't use __getattr__ here. 'update' is a method of the _LogInstance class.
update_logger = core_logger['update']
BINARY_EXT = 'so' if PLATFORM == 'linux' else 'dll'
SP_VDF1 = 'addons/source-python'
SP_VDF2 = 'addons/source-python2'
DATA_ZIP_FILE = DATA_PATH / 'source-python-data.zip'
UPDATE_ZIP_FILE = UPDATE_PATH / 'source-python.zip'
VDF_FILE = ADDONS_PATH / 'source-python.vdf'
LOADER_FILE = ADDONS_PATH / f'source-python.{BINARY_EXT}'
LOADER_UPDATE_FILE = UPDATE_PATH / 'addons' / f'source-python.{BINARY_EXT}'
VDF_UPDATE_FILE = UPDATE_PATH / 'addons' / 'source-python.vdf'
BASE_DATA_URL = 'http://data.sourcepython.com'
CHECKSUM_URL = f'{BASE_DATA_URL}/checksum.txt'
DATA_URL = f'{BASE_DATA_URL}/source-python-data.zip'
BASE_DOWNLOAD_URL = 'http://downloads.sourcepython.com'
ARTIFACTS_URL = f'{BASE_DOWNLOAD_URL}/artifacts.txt'
DEFAULT_TIMEOUT = 3
#: Indicates, whether an update is in progress (stage 1 has been applied).
update_in_progress = False
# =============================================================================
# >> FUNCTIONS
# =============================================================================
[docs]def download_file(url_path, file_path, timeout=3):
"""Download a file from an URL to a specific file.
:param str url_path:
The URL that should be opened.
:param str file_path:
The file where the content of the URL should be stored.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
"""
update_logger.log_debug(f'Downloading file ({url_path}) to {file_path} ...')
now = time.time()
with urlopen(url_path, timeout=timeout) as url:
data = url.read()
with file_path.open('wb') as f:
f.write(data)
update_logger.log_info(
'File has been downloaded. Time elapsed: {:0.2f} seconds'.format(
time.time()-now))
# =============================================================================
# >> FULL SP UPDATE
# =============================================================================
def do_full_update(timeout=DEFAULT_TIMEOUT):
"""Starts a full update of Source.Python.
A restart of the server and possibly manual work is required. Please see
the Source.Python log for the instructions.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
"""
global update_in_progress
if update_in_progress:
update_logger.log_message(
'An update is already in progress. Please follow the instructions '
'in the log.')
return
# Make sure there is a clean update directory
_clean_update_dir()
try:
_download_latest_version(timeout)
_apply_update_stage1()
update_in_progress = True
except:
# Make sure to leave a clean update directory, so the loader doesn't
# get confused.
_clean_update_dir()
raise
[docs]def get_build_artifacts(timeout=DEFAULT_TIMEOUT):
"""Return the artifacts of the latest Source.Python build.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
"""
update_logger.log_debug('Getting artifacts...')
with urlopen(ARTIFACTS_URL, timeout=timeout) as url:
return url.read().decode('utf-8').split('\n')
[docs]def get_download_url(game=SOURCE_ENGINE_BRANCH, timeout=DEFAULT_TIMEOUT):
"""Get the latest Source.Python download URL for a specific game.
:param str game:
The game game to look for (e.g. ``css``).
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
:rtype: str
:raise ValueError:
Raised if the game wasn't found.
"""
for relative_path in get_build_artifacts(timeout):
if f'-{game}-' in relative_path:
return f'{BASE_DOWNLOAD_URL}/{relative_path}'
raise ValueError(f'Unable to find a download URL for game "{game}".')
def _clean_update_dir():
"""Clear or create the update directory."""
if UPDATE_PATH.exists():
for f in UPDATE_PATH.listdir():
if f.isfile():
f.remove()
else:
f.rmtree()
else:
UPDATE_PATH.mkdir()
def _download_latest_version(timeout=DEFAULT_TIMEOUT):
"""Download the latest Source.Python version.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
"""
download_file(get_download_url(), UPDATE_ZIP_FILE, timeout)
def _apply_update_stage1():
"""Apply stage 1 of the version update."""
update_logger.log_message('Applying Source.Python update stage 1...')
# Extract all files to the update directory
with ZipFile(UPDATE_ZIP_FILE) as zip:
zip.extractall(UPDATE_PATH)
UPDATE_ZIP_FILE.remove()
VDF_UPDATE_FILE.remove()
if PLATFORM == 'windows':
_apply_update_stage1_windows()
else:
_apply_update_stage1_linux()
# Apply latest data update
if not DATA_ZIP_FILE.isfile():
return
update_logger.log_debug('Applying latest data update...')
if UPDATE_SP_DATA_PATH.isdir():
update_logger.log_debug(f'Removing {UPDATE_SP_DATA_PATH} ...')
UPDATE_SP_DATA_PATH.rmtree()
_unpack_data(UPDATE_DATA_PATH)
def _apply_update_stage1_windows():
"""Apply the Windows specific part of stage 1.
On Windows files that are currently in use (``source-python.dll``) can't be
replaced. Thus, this function checks if ``source-python.vdf`` exists. If it
does, the new ``source-python.dll`` is copied to the addons directory with
a new name (``source-python2.dll``). After that the VDF entry is modified
to point to the new loader.
If ``source-python.vdf`` does not exist, manual action is required.
"""
if not VDF_FILE.isfile():
update_logger.log_message(
f'Stage 1 has been applied. Please shutdown your server and move '
f'(do not copy) {LOADER_UPDATE_FILE} to {LOADER_FILE}. After that '
f'start your server to apply stage 2.')
else:
update_logger.log_debug('Determining current VDF entry...')
kv = KeyValues.from_file(VDF_FILE)
# Get the current and new entry for the VDF file
current_entry = kv.get_string('file')
if current_entry == SP_VDF2:
new_entry = SP_VDF1
elif current_entry == SP_VDF1:
new_entry = SP_VDF2
else:
raise ValueError(f'Unexpected entry in VDF: {current_entry}')
update_logger.log_debug(f'Current VDF entry: {current_entry}')
update_logger.log_debug(f'New VDF entry: {new_entry}')
update_logger.log_debug('Moving new loader binary to game directory...')
LOADER_UPDATE_FILE.move(GAME_PATH / f'{new_entry}.{BINARY_EXT}')
kv.set_string('file', new_entry)
kv.save_to_file(VDF_FILE)
update_logger.log_message(
'Stage 1 has been applied. Restart your server to apply stage 2.')
def _apply_update_stage1_linux():
"""Apply the Linux specific part of stage 1."""
update_logger.log_debug('Moving new loader binary to game directory...')
LOADER_UPDATE_FILE.move(LOADER_FILE)
update_logger.log_message(
'Stage 1 has been applied. Restart your server to apply stage 2.')
# =============================================================================
# >> SP DATA UPDATE
# =============================================================================
[docs]def update_data(timeout=DEFAULT_TIMEOUT):
"""Download and unpack the latest data from the build server.
Old data gets deleted before unpacking.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
"""
_download_latest_data(timeout)
if SP_DATA_PATH.isdir():
update_logger.log_debug('Removing {} ...'.format(SP_DATA_PATH))
SP_DATA_PATH.rmtree()
_unpack_data(DATA_PATH)
[docs]def is_new_data_available(timeout=DEFAULT_TIMEOUT):
"""Return ``True`` if new data is available.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
:rtype: bool
"""
if not DATA_ZIP_FILE.isfile():
return True
return DATA_ZIP_FILE.read_hexhash('md5') != get_latest_data_checksum(timeout)
[docs]def get_latest_data_checksum(timeout=DEFAULT_TIMEOUT):
"""Return the MD5 checksum of the latest data from the build server.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
:rtype: str
"""
with urlopen(CHECKSUM_URL, timeout=timeout) as url:
return url.read().decode()
def _download_latest_data(timeout=DEFAULT_TIMEOUT):
"""Download the latest data from the build server.
:param float timeout:
Number of seconds that need to pass until a timeout occurs.
"""
download_file(DATA_URL, DATA_ZIP_FILE, timeout)
def _unpack_data(path):
"""Unpack ``source-python-data.zip`` into the given path.
:param Path path:
The path the data file should be unpacked into.
"""
update_logger.log_debug(f'Extracting data in {path} ...')
with ZipFile(DATA_ZIP_FILE) as zip:
zip.extractall(path)