Source code for debusine.worker._worker

#!/usr/bin/env python3

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Worker client: connects to debusine server.

Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
  token: it will generate it and register with the server
  (HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
  ``/api/ws/1.0/worker/connect``)

Flow
----

  #. The worker is executed and chooses ``~/.config/debusine/worker``
     (if it exists) or ``/etc/debusine/worker``. It reads the file
     ``config.ini`` from the directory and if it already exists the file
     ``token``.
  #. If there isn't a token the worker generates one (using
     :py:func:`secrets.token_hex`) and registers it to the Debusine server
     via HTTP POST to `/api/1.0/worker/register` sending the generated token
     and the worker's FQDN. The token is saved to the ``token`` file in the
     chosen config directory.
  #. The server will create a new Token and Worker in the DB via the models.
     They wouldn't be used until manual validation.
  #. The client can then connect using WebSockets to
     ``/api/ws/1.0/worker/connect`` and wait for commands to execute.

Objects documentation
---------------------

"""
import asyncio
import functools
import logging
import secrets
import signal
import socket
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

import aiohttp
from aiohttp.web_exceptions import HTTPCreated, HTTPOk

from pydantic import ValidationError

import tenacity

from debusine.client.models import WorkRequestResponse
from debusine.tasks import Task, TaskConfigError

from . import system_information
from .config import ConfigHandler
from .debusine_async_http_client import DebusineAsyncHttpClient
from ..client.debusine import Debusine
from ..client.exceptions import TokenDisabledError


[docs]class Worker: """Worker class: waits for commands from the debusine server.""" DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__( self, *, log_file: str = None, log_level=None, config=None, ): """ Initialize Worker. :param log_file: log file to where the logs are saved. If None uses settings from config.ini or default's Python (stderr). :param log_level: minimum level of the logs being saved. If None uses settings from config.ini or DEFAULT_LOG_LEVEL. :param config: ConfigHandler to use (or creates a default one) """ self._original_log_file = log_file self._original_log_level = log_level self._config: ConfigHandler if config is None: self._config = ConfigHandler() else: self._config = config self._setup_logging() self._config.validate_config_or_fail() self._aiohttp_client_session: Optional[aiohttp.ClientSession] = None self._async_http_client = DebusineAsyncHttpClient(logging, self._config) logger = logging.getLogger() self._debusine = Debusine( self._config.api_url, self._config.token, logger=logger ) # ThreadPool for Task.execute (e.g. Sbuild.exec) self._task_executor = ThreadPoolExecutor() # Task.execute (e.g. Sbuild.exec) future is stored here (avoids # garbage collection deletion) self._task_exec_future = None # Used by ThreadPoolExecutor to submit tasks to the main event loop self._main_event_loop = None # Running task self._task_running: Optional[Task] = None
def _setup_logging(self): """ Set logging configuration. Use the parameters passed to Worker.__init__() and self._config configuration. If the log file cannot be opened it aborts. """ log_file = self._original_log_file or self._config.log_file log_level = ( self._original_log_level or self._config.log_level or logging.getLevelName(self.DEFAULT_LOG_LEVEL) ) try: logging.basicConfig( format='%(asctime)s %(message)s', filename=log_file, level=log_level, force=True, ) except OSError as exc: self._fail(f'Cannot open {log_file}: {exc}') def _create_task_signal_int_term_handler(self, signum): return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self): """Run the worker.""" self._set_main_event_loop() for signum in [signal.SIGINT, signal.SIGTERM]: self._main_event_loop.add_signal_handler( signum, functools.partial( self._create_task_signal_int_term_handler, signum ), ) await self.connect() logging.info("debusine-worker lost connection with debusine-server") await self.close()
def _set_main_event_loop(self): self._main_event_loop = asyncio.get_running_loop() async def _signal_int_term_handler(self, signum): if self._task_running is not None: self._task_running.abort() self.log_forced_exit(signum) await self.close()
[docs] async def close(self): """Close the AioHTTP session.""" if self._aiohttp_client_session is not None: await self._aiohttp_client_session.close() await self._async_http_client.close()
@staticmethod def _log_server_error(status_code, body): logging.error( 'Could not register. Server HTTP status code: %s Body: %s\n', status_code, body, ) async def _register(self): """ Create a token, registers it to debusine, saves it locally. The worker will not receive any tasks until the debusine admin has approved the token. """ token = secrets.token_hex(32) register_path = '/1.0/worker/register/' data = {'token': token, 'fqdn': socket.getfqdn()} try: response = await self._async_http_client.post( register_path, json=data ) except aiohttp.client_exceptions.ClientResponseError as err: self._log_server_error(err.status, 'Not available') return False except aiohttp.client_exceptions.ClientConnectorError as err: logging.error( # noqa: G200 'Could not register. Server unreachable: %s', err ) return False status, body = ( response.status, (await response.content.read()).decode('utf-8'), ) if status == HTTPCreated.status_code: self._config.write_token(token) return True else: self._log_server_error(status, body) return False @staticmethod async def _tenacity_sleep(delay): await asyncio.sleep(delay)
[docs] async def connect(self): """ Connect (registering if needed) to the debusine server. Uses the URL for debusine server from the configuration file. """ self._aiohttp_client_session = aiohttp.ClientSession() if not self._config.token: result = await self._register() if result is False: self._fail('Exiting...') debusine_ws_url = self._config.api_url.replace('http', 'ws', 1) worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/' @tenacity.retry( sleep=self._tenacity_sleep, wait=tenacity.wait_random(min=1, max=6), before=self._log_trying_to_reconnect, retry=tenacity.retry_if_exception_type( aiohttp.client_exceptions.ClientError ) | tenacity.retry_if_exception_type(TokenDisabledError) | tenacity.retry_if_exception_type(ConnectionError), ) async def _do_wait_for_messages(): """Connect to the server and waits for commands.""" headers = {'token': self._config.token} try: async with self._aiohttp_client_session.ws_connect( worker_connect_ws_url, headers=headers, heartbeat=60 ) as ws: logging.info("Connected to %s", self._config.api_url) async for msg in ws: # pragma: no branch msg: aiohttp.WSMessage await self._process_message(msg) return True except TokenDisabledError as exc: # The server disconnect the worker because the token # is disabled. Debusine's server admin should enable # the token. logging.info( # noqa: G200 'The token (%s) is disabled. ' 'Debusine admin (%s) should enable it. ' 'Reason: %s. ' 'Will try again', self._config.token, self._config.api_url, exc, ) raise except ( aiohttp.client_exceptions.ClientConnectorError, aiohttp.client_exceptions.WSServerHandshakeError, ) as exc: logging.info( # noqa: G200 'Error connecting to %s (%s)', self._config.api_url, exc, ) raise except ConnectionResetError: logging.debug('ConnectionResetError, will reconnect') raise await _do_wait_for_messages()
@staticmethod def _log_trying_to_reconnect(retry_state): if retry_state.attempt_number > 1: logging.debug( 'Trying to reconnect, attempt %d', retry_state.attempt_number ) @staticmethod def _fail(message): logging.fatal(message) raise SystemExit(3) async def _request_and_execute_work_request(self): work_request = await self._request_work_request() if work_request: await self._execute_work_request_and_submit_result(work_request) else: logging.debug('No work request available') async def _process_message(self, msg): """ Process messages pushed by the debusine server to the client. Return True for processed messages; False for non-processed messages and it logs the reason. """ async def work_request_available(msg_content: dict): # noqa: U100 if self._task_exec_future: logging.info( "Work request available but ignored: " "currently running a Task" ) else: await self._request_and_execute_work_request() return await Debusine.process_async_message( msg, { "request_dynamic_metadata": self._send_dynamic_metadata, "work_request_available": work_request_available, }, logger=logging, ) async def _request_work_request(self) -> Optional[WorkRequestResponse]: """Request a work request and returns it.""" work_request = await self._async_http_client.get( '/1.0/work-request/get-next-for-worker/' ) if work_request.status == HTTPOk.status_code: try: work_request_obj = WorkRequestResponse.parse_raw( await work_request.text() ) except ValidationError as exc: logging.warning( # noqa: G200 'Invalid WorkRequest received from' ' /get-next-for-worker/: %s', exc, ) return None return work_request_obj else: return None @staticmethod def _system_metadata(): """Return system metadata such as CPU count or physical memory.""" cpu_count = system_information.cpu_count() total_physical_memory = system_information.total_physical_memory() data = { 'system:cpu_count': cpu_count, 'system:total_physical_memory': total_physical_memory, } return data async def _send_dynamic_metadata(self, msg_content: dict): # noqa: U100 dynamic_metadata_path = '/1.0/worker/dynamic-metadata/' metadata = {} metadata.update(self._system_metadata()) metadata.update(Task.analyze_worker_all_tasks()) response = await self._async_http_client.put( dynamic_metadata_path, json=metadata ) logging.debug( 'Sent dynamic_metadata (response: %d) Dynamic metadata: %s', response.status, metadata, ) async def _execute_work_request_and_submit_result( self, work_request: WorkRequestResponse ): self._task_running = Task.class_from_name(work_request.task_name)() logger = logging.getLogger() self._task_running.configure_server_access( Debusine(self._config.api_url, self._config.token, logger=logger) ) self._task_running.work_request = work_request.id try: self._task_running.configure(work_request.task_data) except TaskConfigError as exc: logging.error( # noqa: G200 'Task: %s Error configure: %s', self._task_running.name, exc ) self._debusine.work_request_completed_update( work_request.id, "error" ) return self._task_name = self._task_running.name self._task_exec_future = self._task_executor.submit( self._task_running.execute_logging_exceptions ) self._task_exec_future.add_done_callback( functools.partial( self._send_task_result, work_request.id, ) ) self._task_exec_future.add_done_callback( self._task_exec_future_finished ) def _task_exec_future_finished(self, task_exec_future): # noqa: U100 self._task_exec_future = None self._task_running = None def _send_task_result(self, work_request_id: int, task_exec_future) -> None: """ Send the result of the task to the debusine server. :param work_request_id: WorkRequest.id :param task_exec_future: task_exec that executed the Task. This method check the result via .result() or .exception() of this object """ if self._task_running.aborted: logging.info("Task: %s has been aborted", self._task_name) # No need to notify debusine-server return elif task_exec_future.exception(): logging.error( # noqa: G200 'Task: %s Error execute: %s', self._task_name, task_exec_future.exception(), ) result = 'error' else: result = 'success' if task_exec_future.result() else 'failure' self._debusine.work_request_completed_update(work_request_id, result)
[docs] @staticmethod def log_forced_exit(signum): """Log a forced exit.""" return logging.info('Terminated with signal %s', signum.name)