Source code for debusine.worker._worker

#!/usr/bin/env python3

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Worker client: connects to debusine server.

Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
  token: it will generate it and register with the server
  (HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
  ``/api/ws/1.0/worker/connect``)

Flow
----

  #. The worker is executed and chooses ``~/.config/debusine/worker``
     (if it exists) or ``/etc/debusine/worker``. It reads the file
     ``config.ini`` from the directory and if it already exists the file
     ``token``.
  #. If there isn't a token the worker generates one (using
     :py:func:`secrets.token_hex`) and registers it to the Debusine server
     via HTTP POST to `/api/1.0/worker/register` sending the generated token
     and the worker's FQDN. The token is saved to the ``token`` file in the
     chosen config directory.
  #. The server will create a new Token and Worker in the DB via the models.
     They wouldn't be used until manual validation.
  #. The client can then connect using WebSockets to
     ``/api/ws/1.0/worker/connect`` and wait for commands to execute.

Objects documentation
---------------------

"""
import asyncio
import functools
import logging
import secrets
import signal
import socket
from concurrent.futures import Future, ThreadPoolExecutor
from threading import Lock
from typing import Any

import aiohttp
from aiohttp.web_exceptions import HTTPCreated, HTTPOk

try:
    from pydantic.v1 import ValidationError
except ImportError:
    from pydantic import ValidationError  # type: ignore

import tenacity

from debusine.client.debusine import Debusine
from debusine.client.exceptions import TokenDisabledError
from debusine.client.models import WorkRequestResponse
from debusine.tasks import BaseExternalTask, BaseTask, TaskConfigError
from debusine.tasks.executors import analyze_worker_all_executors
from debusine.tasks.models import TaskTypes
from debusine.worker.config import ConfigHandler
from debusine.worker.debusine_async_http_client import DebusineAsyncHttpClient
from debusine.worker.system_information import WorkerType, system_metadata


[docs]class Worker: """Worker class: waits for commands from the debusine server.""" DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__( self, *, log_file: str | None = None, log_level=None, config=None, ): """ Initialize Worker. :param log_file: log file to where the logs are saved. If None uses settings from config.ini or default's Python (stderr). :param log_level: minimum level of the logs being saved. If None uses settings from config.ini or DEFAULT_LOG_LEVEL. :param config: ConfigHandler to use (or creates a default one) """ self._original_log_file = log_file self._original_log_level = log_level self._config: ConfigHandler if config is None: self._config = ConfigHandler() else: self._config = config self._setup_logging() self._config.validate_config_or_fail() self._aiohttp_client_session: aiohttp.ClientSession | None = None self._async_http_client = DebusineAsyncHttpClient(logging, self._config) # ThreadPool for BaseTask.execute (e.g. Sbuild.exec) self._task_executor = ThreadPoolExecutor() # BaseTask.execute (e.g. Sbuild.exec) future is stored here (avoids # garbage collection deletion) self._task_exec_future: Future[bool] | None = None # Used by ThreadPoolExecutor to submit tasks to the main event loop self._main_event_loop: asyncio.AbstractEventLoop | None = None # Running task self._task_running: BaseTask[Any] | None = None # Lock to protect from concurrent work request execution self._task_lock: Lock = Lock() # Set to True if the retry_attempt_number should be reset self._reset_retry_attempt_number: bool = False
@functools.cached_property def _debusine(self): return Debusine( self._config.api_url, self._config.token, logger=logging.getLogger() ) def _setup_logging(self): """ Set logging configuration. Use the parameters passed to Worker.__init__() and self._config configuration. If the log file cannot be opened it aborts. """ log_file = self._original_log_file or self._config.log_file log_level = ( self._original_log_level or self._config.log_level or logging.getLevelName(self.DEFAULT_LOG_LEVEL) ) try: logging.basicConfig( format='%(asctime)s %(message)s', filename=log_file, level=log_level, force=True, ) except OSError as exc: self._fail(f'Cannot open {log_file}: {exc}') def _create_task_signal_int_term_handler(self, signum): return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self): """Run the worker.""" self._set_main_event_loop() for signum in [signal.SIGINT, signal.SIGTERM]: self._main_event_loop.add_signal_handler( signum, functools.partial( self._create_task_signal_int_term_handler, signum ), ) await self.connect() logging.info("debusine-worker lost connection with debusine-server") await self.close()
def _set_main_event_loop(self): self._main_event_loop = asyncio.get_running_loop() async def _signal_int_term_handler(self, signum): if self._task_running is not None: self._task_running.abort() self.log_forced_exit(signum) await self.close()
[docs] async def close(self): """Close the AioHTTP session.""" if self._aiohttp_client_session is not None: await self._aiohttp_client_session.close() await self._async_http_client.close()
@staticmethod def _log_server_error(status_code, body): logging.error( 'Could not register. Server HTTP status code: %s Body: %s\n', status_code, body, ) async def _register(self): """ Create a token, registers it to debusine, saves it locally. The worker will not receive any tasks until the debusine admin has approved the token. """ token = secrets.token_hex(32) register_path = '/1.0/worker/register/' data = {'token': token, 'fqdn': socket.getfqdn()} try: response = await self._async_http_client.post( register_path, json=data ) except aiohttp.client_exceptions.ClientResponseError as err: self._log_server_error(err.status, 'Not available') return False except aiohttp.client_exceptions.ClientConnectorError as err: logging.error( # noqa: G200 'Could not register. Server unreachable: %s', err ) return False status, body = ( response.status, (await response.content.read()).decode('utf-8'), ) if status == HTTPCreated.status_code: self._config.write_token(token) # If we already had a cached client, invalidate it so that we # start using the new token. try: del self._debusine except AttributeError: pass return True else: self._log_server_error(status, body) return False @staticmethod async def _asyncio_sleep(delay): """Sleep asynchronously (mocked in tests).""" await asyncio.sleep(delay) def _raise_try_again(self): raise tenacity.TryAgain()
[docs] async def connect(self): """ Connect (registering if needed) to the debusine server. Uses the URL for debusine server from the configuration file. """ self._aiohttp_client_session = aiohttp.ClientSession() if not self._config.token: result = await self._register() if result is False: self._fail('Exiting...') debusine_ws_url = self._config.api_url.replace('http', 'ws', 1) worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/' def tenacity_before_hook(retry_state): self._reset_retry_counter(retry_state) self._log_trying_to_reconnect(retry_state) @tenacity.retry( sleep=self._asyncio_sleep, wait=tenacity.wait_random(min=1, max=6), before=tenacity_before_hook, retry=tenacity.retry_if_exception_type( aiohttp.client_exceptions.ClientError ) | tenacity.retry_if_exception_type(TokenDisabledError) | tenacity.retry_if_exception_type(ConnectionError), ) async def _do_wait_for_messages() -> None: """Connect to the server and waits for commands.""" headers = {'token': self._config.token} try: # Set at the top of Worker.connect. assert self._aiohttp_client_session is not None async with self._aiohttp_client_session.ws_connect( worker_connect_ws_url, headers=headers, heartbeat=60 ) as ws: self._reset_retry_attempt_number = True logging.info("Connected to %s", self._config.api_url) msg: aiohttp.WSMessage async for msg in ws: # pragma: no branch await self._process_message(msg) # The server disconnected the worker self._raise_try_again() except TokenDisabledError as exc: # The server disconnected the worker because the token is # not associated with a worker or is disabled. The server # admin should enable the token. logging.info( # noqa: G200 'The token (%s) is disabled. ' 'Debusine admin (%s) should enable it. ' 'Reason: %s. ' 'Will try again', self._config.token, self._config.api_url, exc, ) raise except ( aiohttp.client_exceptions.ClientConnectorError, aiohttp.client_exceptions.WSServerHandshakeError, ) as exc: logging.info( # noqa: G200 'Error connecting to %s (%s)', self._config.api_url, exc, ) raise except ConnectionResetError: logging.debug('ConnectionResetError, will reconnect') raise await _do_wait_for_messages()
def _reset_retry_counter(self, retry_state): if self._reset_retry_attempt_number: retry_state.attempt_number = 0 self._reset_retry_attempt_number = False @staticmethod def _log_trying_to_reconnect(retry_state): if retry_state.attempt_number > 1: logging.debug( 'Trying to reconnect, attempt %d', retry_state.attempt_number ) @staticmethod def _fail(message): logging.fatal(message) raise SystemExit(3) async def _request_and_execute_work_request(self): # Grab the task lock to make sure that the previous work request # completed fully. We can be notified before the end because the # server notifies via websocket while we are still wrapping up # the task execution. retries = 0 while not self._task_lock.acquire(blocking=False): await self._asyncio_sleep(1) retries += 1 if retries >= 60: logging.error( "Worker is busy and can't execute a new work request" ) return work_request = await self._request_work_request() if work_request: await self._execute_work_request_and_submit_result(work_request) else: logging.debug('No work request available') self._task_lock.release() async def _process_message(self, msg): """ Process messages pushed by the debusine server to the client. Return True for processed messages; False for non-processed messages and it logs the reason. """ async def work_request_available( msg_content: dict[Any, Any] # noqa: U100 ): await self._request_and_execute_work_request() return await Debusine.process_async_message( msg, { "request_dynamic_metadata": self._send_dynamic_metadata, "work_request_available": work_request_available, }, logger=logging, ) async def _request_work_request(self) -> WorkRequestResponse | None: """Request a work request and returns it.""" work_request = await self._async_http_client.get( '/1.0/work-request/get-next-for-worker/' ) if work_request.status == HTTPOk.status_code: try: work_request_obj = WorkRequestResponse.parse_raw( await work_request.text() ) except ValidationError as exc: logging.warning( # noqa: G200 'Invalid WorkRequest received from' ' /get-next-for-worker/: %s', exc, ) return None return work_request_obj else: return None async def _send_dynamic_metadata( self, msg_content: dict[Any, Any] # noqa: U100 ): dynamic_metadata_path = '/1.0/worker/dynamic-metadata/' metadata = { **system_metadata(WorkerType.EXTERNAL), **analyze_worker_all_executors(), **BaseTask.analyze_worker_all_tasks(), } response = await self._async_http_client.put( dynamic_metadata_path, json=metadata ) logging.debug( 'Sent dynamic_metadata (response: %d) Dynamic metadata: %s', response.status, metadata, ) async def _execute_work_request_and_submit_result( self, work_request: WorkRequestResponse ): try: task_class = BaseTask.class_from_name( TaskTypes.WORKER, work_request.task_name ) except ValueError as exc: logging.error( # noqa: G200 'Task: %s Error setup: %s', work_request.task_name, exc ) self._debusine.work_request_completed_update( work_request.id, "error" ) self._reset_state() return try: self._task_running = task_class(work_request.task_data) except TaskConfigError as exc: logging.error( # noqa: G200 'Task: %s Error configure: %s', work_request.task_name, exc ) self._debusine.work_request_completed_update( work_request.id, "error" ) self._reset_state() return assert self._task_running.TASK_TYPE is TaskTypes.WORKER assert isinstance(self._task_running, BaseExternalTask) logger = logging.getLogger() self._task_running.configure_server_access( Debusine(self._config.api_url, self._config.token, logger=logger) ) self._task_running.work_request_id = work_request.id self._task_running.workspace_name = work_request.workspace self._task_name = self._task_running.name self._task_exec_future = self._task_executor.submit( self._task_running.execute_logging_exceptions ) self._task_exec_future.add_done_callback( functools.partial( self._send_task_result, work_request.id, ) ) def _reset_state(self) -> None: """Reset worker to a clean state, ready for the next work request.""" self._task_exec_future = None self._task_running = None if self._task_lock.locked(): # Release the task lock to allow the worker to process the next # work request self._task_lock.release() @staticmethod async def _exit_thread(): # pragma: no cover """Exit current thread, mocked in tests.""" raise SystemExit(1) def _send_task_result(self, work_request_id: int, task_exec_future) -> None: """ Send the result of the task to the debusine server. :param work_request_id: WorkRequest.id :param task_exec_future: task_exec that executed the BaseTask. This method checks the result via .result() or .exception() of this object """ # This is called unconditionally after Task completion / # failure, on a separate thread (normally the Task thread), # i.e. *not* in the main thread with the asyncio loop assert self._task_running is not None try: if self._task_running.aborted: logging.info("Task: %s has been aborted", self._task_name) # No need to notify debusine-server return elif task_exec_future.exception(): logging.error( # noqa: G200 'Task: %s Error execute: %s', self._task_name, task_exec_future.exception(), ) result = 'error' else: result = 'success' if task_exec_future.result() else 'failure' try: self._debusine.work_request_completed_update( work_request_id, result ) except Exception: # Don't leave the worker stuck in "Running" state on the server logging.exception( "Cannot reach server to report work request completed," " exiting." ) # _main_event_loop initialized in main and never unset assert self._main_event_loop is not None # Exit main asyncio thread from this secondary thread asyncio.run_coroutine_threadsafe( self._exit_thread(), self._main_event_loop ) finally: self._reset_state()
[docs] @staticmethod def log_forced_exit(signum): """Log a forced exit.""" return logging.info('Terminated with signal %s', signum.name)