#!/usr/bin/env python3
# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Worker client: connects to debusine server.
Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
token: it will generate it and register with the server
(HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
``/api/ws/1.0/worker/connect``)
Flow
----
#. The worker is executed and chooses ``~/.config/debusine/worker``
(if it exists) or ``/etc/debusine/worker``. It reads the file
``config.ini`` from the directory and if it already exists the file
``token``.
#. If there isn't a token the worker generates one (using
:py:func:`secrets.token_hex`) and registers it to the Debusine server
via HTTP POST to `/api/1.0/worker/register` sending the generated token
and the worker's FQDN. The token is saved to the ``token`` file in the
chosen config directory.
#. The server will create a new Token and Worker in the DB via the models.
They wouldn't be used until manual validation.
#. The client can then connect using WebSockets to
``/api/ws/1.0/worker/connect`` and wait for commands to execute.
Objects documentation
---------------------
"""
import asyncio
import functools
import logging
import secrets
import signal
import socket
from concurrent.futures import Future, ThreadPoolExecutor
from threading import Lock
from typing import Any
import aiohttp
from aiohttp.web_exceptions import HTTPCreated, HTTPOk
try:
from pydantic.v1 import ValidationError
except ImportError:
from pydantic import ValidationError # type: ignore
import tenacity
from debusine.client.debusine import Debusine
from debusine.client.exceptions import TokenDisabledError
from debusine.client.models import WorkRequestResponse
from debusine.tasks import BaseExternalTask, BaseTask, TaskConfigError
from debusine.tasks.executors import analyze_worker_all_executors
from debusine.tasks.models import TaskTypes
from debusine.worker.config import ConfigHandler
from debusine.worker.debusine_async_http_client import DebusineAsyncHttpClient
from debusine.worker.system_information import WorkerType, system_metadata
[docs]class Worker:
"""Worker class: waits for commands from the debusine server."""
DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__(
self,
*,
log_file: str | None = None,
log_level=None,
config=None,
):
"""
Initialize Worker.
:param log_file: log file to where the logs are saved. If None uses
settings from config.ini or default's Python (stderr).
:param log_level: minimum level of the logs being saved. If None uses
settings from config.ini or DEFAULT_LOG_LEVEL.
:param config: ConfigHandler to use (or creates a default one)
"""
self._original_log_file = log_file
self._original_log_level = log_level
self._config: ConfigHandler
if config is None:
self._config = ConfigHandler()
else:
self._config = config
self._setup_logging()
self._config.validate_config_or_fail()
self._aiohttp_client_session: aiohttp.ClientSession | None = None
self._async_http_client = DebusineAsyncHttpClient(logging, self._config)
# ThreadPool for BaseTask.execute (e.g. Sbuild.exec)
self._task_executor = ThreadPoolExecutor()
# BaseTask.execute (e.g. Sbuild.exec) future is stored here (avoids
# garbage collection deletion)
self._task_exec_future: Future[bool] | None = None
# Used by ThreadPoolExecutor to submit tasks to the main event loop
self._main_event_loop: asyncio.AbstractEventLoop | None = None
# Running task
self._task_running: BaseTask[Any] | None = None
# Lock to protect from concurrent work request execution
self._task_lock: Lock = Lock()
# Set to True if the retry_attempt_number should be reset
self._reset_retry_attempt_number: bool = False
@functools.cached_property
def _debusine(self):
return Debusine(
self._config.api_url, self._config.token, logger=logging.getLogger()
)
def _setup_logging(self):
"""
Set logging configuration.
Use the parameters passed to Worker.__init__() and self._config
configuration.
If the log file cannot be opened it aborts.
"""
log_file = self._original_log_file or self._config.log_file
log_level = (
self._original_log_level
or self._config.log_level
or logging.getLevelName(self.DEFAULT_LOG_LEVEL)
)
try:
logging.basicConfig(
format='%(asctime)s %(message)s',
filename=log_file,
level=log_level,
force=True,
)
except OSError as exc:
self._fail(f'Cannot open {log_file}: {exc}')
def _create_task_signal_int_term_handler(self, signum):
return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self):
"""Run the worker."""
self._set_main_event_loop()
for signum in [signal.SIGINT, signal.SIGTERM]:
self._main_event_loop.add_signal_handler(
signum,
functools.partial(
self._create_task_signal_int_term_handler, signum
),
)
await self.connect()
logging.info("debusine-worker lost connection with debusine-server")
await self.close()
def _set_main_event_loop(self):
self._main_event_loop = asyncio.get_running_loop()
async def _signal_int_term_handler(self, signum):
if self._task_running is not None:
self._task_running.abort()
self.log_forced_exit(signum)
await self.close()
[docs] async def close(self):
"""Close the AioHTTP session."""
if self._aiohttp_client_session is not None:
await self._aiohttp_client_session.close()
await self._async_http_client.close()
@staticmethod
def _log_server_error(status_code, body):
logging.error(
'Could not register. Server HTTP status code: %s Body: %s\n',
status_code,
body,
)
async def _register(self):
"""
Create a token, registers it to debusine, saves it locally.
The worker will not receive any tasks until the debusine admin
has approved the token.
"""
token = secrets.token_hex(32)
register_path = '/1.0/worker/register/'
data = {'token': token, 'fqdn': socket.getfqdn()}
try:
response = await self._async_http_client.post(
register_path, json=data
)
except aiohttp.client_exceptions.ClientResponseError as err:
self._log_server_error(err.status, 'Not available')
return False
except aiohttp.client_exceptions.ClientConnectorError as err:
logging.error( # noqa: G200
'Could not register. Server unreachable: %s', err
)
return False
status, body = (
response.status,
(await response.content.read()).decode('utf-8'),
)
if status == HTTPCreated.status_code:
self._config.write_token(token)
# If we already had a cached client, invalidate it so that we
# start using the new token.
try:
del self._debusine
except AttributeError:
pass
return True
else:
self._log_server_error(status, body)
return False
@staticmethod
async def _asyncio_sleep(delay):
"""Sleep asynchronously (mocked in tests)."""
await asyncio.sleep(delay)
def _raise_try_again(self):
raise tenacity.TryAgain()
[docs] async def connect(self):
"""
Connect (registering if needed) to the debusine server.
Uses the URL for debusine server from the configuration file.
"""
self._aiohttp_client_session = aiohttp.ClientSession()
if not self._config.token:
result = await self._register()
if result is False:
self._fail('Exiting...')
debusine_ws_url = self._config.api_url.replace('http', 'ws', 1)
worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/'
def tenacity_before_hook(retry_state):
self._reset_retry_counter(retry_state)
self._log_trying_to_reconnect(retry_state)
@tenacity.retry(
sleep=self._asyncio_sleep,
wait=tenacity.wait_random(min=1, max=6),
before=tenacity_before_hook,
retry=tenacity.retry_if_exception_type(
aiohttp.client_exceptions.ClientError
)
| tenacity.retry_if_exception_type(TokenDisabledError)
| tenacity.retry_if_exception_type(ConnectionError),
)
async def _do_wait_for_messages() -> None:
"""Connect to the server and waits for commands."""
headers = {'token': self._config.token}
try:
# Set at the top of Worker.connect.
assert self._aiohttp_client_session is not None
async with self._aiohttp_client_session.ws_connect(
worker_connect_ws_url, headers=headers, heartbeat=60
) as ws:
self._reset_retry_attempt_number = True
logging.info("Connected to %s", self._config.api_url)
msg: aiohttp.WSMessage
async for msg in ws: # pragma: no branch
await self._process_message(msg)
# The server disconnected the worker
self._raise_try_again()
except TokenDisabledError as exc:
# The server disconnected the worker because the token is
# not associated with a worker or is disabled. The server
# admin should enable the token.
logging.info( # noqa: G200
'The token (%s) is disabled. '
'Debusine admin (%s) should enable it. '
'Reason: %s. '
'Will try again',
self._config.token,
self._config.api_url,
exc,
)
raise
except (
aiohttp.client_exceptions.ClientConnectorError,
aiohttp.client_exceptions.WSServerHandshakeError,
) as exc:
logging.info( # noqa: G200
'Error connecting to %s (%s)',
self._config.api_url,
exc,
)
raise
except ConnectionResetError:
logging.debug('ConnectionResetError, will reconnect')
raise
await _do_wait_for_messages()
def _reset_retry_counter(self, retry_state):
if self._reset_retry_attempt_number:
retry_state.attempt_number = 0
self._reset_retry_attempt_number = False
@staticmethod
def _log_trying_to_reconnect(retry_state):
if retry_state.attempt_number > 1:
logging.debug(
'Trying to reconnect, attempt %d', retry_state.attempt_number
)
@staticmethod
def _fail(message):
logging.fatal(message)
raise SystemExit(3)
async def _request_and_execute_work_request(self):
# Grab the task lock to make sure that the previous work request
# completed fully. We can be notified before the end because the
# server notifies via websocket while we are still wrapping up
# the task execution.
retries = 0
while not self._task_lock.acquire(blocking=False):
await self._asyncio_sleep(1)
retries += 1
if retries >= 60:
logging.error(
"Worker is busy and can't execute a new work request"
)
return
work_request = await self._request_work_request()
if work_request:
await self._execute_work_request_and_submit_result(work_request)
else:
logging.debug('No work request available')
self._task_lock.release()
async def _process_message(self, msg):
"""
Process messages pushed by the debusine server to the client.
Return True for processed messages; False for non-processed messages
and it logs the reason.
"""
async def work_request_available(
msg_content: dict[Any, Any] # noqa: U100
):
await self._request_and_execute_work_request()
return await Debusine.process_async_message(
msg,
{
"request_dynamic_metadata": self._send_dynamic_metadata,
"work_request_available": work_request_available,
},
logger=logging,
)
async def _request_work_request(self) -> WorkRequestResponse | None:
"""Request a work request and returns it."""
work_request = await self._async_http_client.get(
'/1.0/work-request/get-next-for-worker/'
)
if work_request.status == HTTPOk.status_code:
try:
work_request_obj = WorkRequestResponse.parse_raw(
await work_request.text()
)
except ValidationError as exc:
logging.warning( # noqa: G200
'Invalid WorkRequest received from'
' /get-next-for-worker/: %s',
exc,
)
return None
return work_request_obj
else:
return None
async def _send_dynamic_metadata(
self, msg_content: dict[Any, Any] # noqa: U100
):
dynamic_metadata_path = '/1.0/worker/dynamic-metadata/'
metadata = {
**system_metadata(WorkerType.EXTERNAL),
**analyze_worker_all_executors(),
**BaseTask.analyze_worker_all_tasks(),
}
response = await self._async_http_client.put(
dynamic_metadata_path, json=metadata
)
logging.debug(
'Sent dynamic_metadata (response: %d) Dynamic metadata: %s',
response.status,
metadata,
)
async def _execute_work_request_and_submit_result(
self, work_request: WorkRequestResponse
):
try:
task_class = BaseTask.class_from_name(
TaskTypes.WORKER, work_request.task_name
)
except ValueError as exc:
logging.error( # noqa: G200
'Task: %s Error setup: %s', work_request.task_name, exc
)
self._debusine.work_request_completed_update(
work_request.id, "error"
)
self._reset_state()
return
try:
self._task_running = task_class(work_request.task_data)
except TaskConfigError as exc:
logging.error( # noqa: G200
'Task: %s Error configure: %s', work_request.task_name, exc
)
self._debusine.work_request_completed_update(
work_request.id, "error"
)
self._reset_state()
return
assert self._task_running.TASK_TYPE is TaskTypes.WORKER
assert isinstance(self._task_running, BaseExternalTask)
logger = logging.getLogger()
self._task_running.configure_server_access(
Debusine(self._config.api_url, self._config.token, logger=logger)
)
self._task_running.work_request_id = work_request.id
self._task_running.workspace_name = work_request.workspace
self._task_name = self._task_running.name
self._task_exec_future = self._task_executor.submit(
self._task_running.execute_logging_exceptions
)
self._task_exec_future.add_done_callback(
functools.partial(
self._send_task_result,
work_request.id,
)
)
def _reset_state(self) -> None:
"""Reset worker to a clean state, ready for the next work request."""
self._task_exec_future = None
self._task_running = None
if self._task_lock.locked():
# Release the task lock to allow the worker to process the next
# work request
self._task_lock.release()
@staticmethod
async def _exit_thread(): # pragma: no cover
"""Exit current thread, mocked in tests."""
raise SystemExit(1)
def _send_task_result(self, work_request_id: int, task_exec_future) -> None:
"""
Send the result of the task to the debusine server.
:param work_request_id: WorkRequest.id
:param task_exec_future: task_exec that executed the BaseTask. This
method checks the result via .result() or .exception() of this
object
"""
# This is called unconditionally after Task completion /
# failure, on a separate thread (normally the Task thread),
# i.e. *not* in the main thread with the asyncio loop
assert self._task_running is not None
try:
if self._task_running.aborted:
logging.info("Task: %s has been aborted", self._task_name)
# No need to notify debusine-server
return
elif task_exec_future.exception():
logging.error( # noqa: G200
'Task: %s Error execute: %s',
self._task_name,
task_exec_future.exception(),
)
result = 'error'
else:
result = 'success' if task_exec_future.result() else 'failure'
try:
self._debusine.work_request_completed_update(
work_request_id, result
)
except Exception:
# Don't leave the worker stuck in "Running" state on the server
logging.exception(
"Cannot reach server to report work request completed,"
" exiting."
)
# _main_event_loop initialized in main and never unset
assert self._main_event_loop is not None
# Exit main asyncio thread from this secondary thread
asyncio.run_coroutine_threadsafe(
self._exit_thread(), self._main_event_loop
)
finally:
self._reset_state()
[docs] @staticmethod
def log_forced_exit(signum):
"""Log a forced exit."""
return logging.info('Terminated with signal %s', signum.name)