Source code for debusine.worker._worker

#!/usr/bin/env python3

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Worker client: connects to debusine server.

Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
  token: it will generate it and register with the server
  (HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
  ``/ws/1.0/worker/connect``)

Flow
----

  #. The worker is executed and chooses ``~/.config/debusine/worker``
     (if it exists) or ``/etc/debusine/worker``. It reads the file
     ``config.ini`` from the directory and if it already exists the file
     ``token``.
  #. If there isn't a token the worker generates one (using
     :py:func:`secrets.token_hex`) and registers it to the Debusine server
     via HTTP POST to `/api/1.0/worker/register` sending the generated token
     and the worker's FQDN. The token is saved to the ``token`` file in the
     chosen config directory.
  #. The server will create a new Token and Worker in the DB via the models.
     They wouldn't be used until manual validation.
  #. The client can then connect using WebSockets to ``/ws/1.0/worker/connect``
     to wait for commands to execute.

Objects documentation
---------------------

"""
import asyncio
import functools
import json
import logging
import secrets
import signal
import socket
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

import aiohttp
from aiohttp import WSMessage, WSMsgType
from aiohttp.web_exceptions import HTTPCreated, HTTPOk

from pydantic import ValidationError

import tenacity

from debusine.client.models import WorkRequest
from debusine.tasks import Task, TaskConfigError

from . import system_information
from .config import ConfigHandler
from .debusine_async_http_client import DebusineAsyncHttpClient
from ..client.debusine import Debusine
from ..client.exceptions import TokenDisabledError


[docs]class Worker: """Worker class: waits for commands from the debusine server.""" DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__( self, *, log_file: str = None, log_level=None, config=None, ): """ Initialize Worker. :param log_file: log file to where the logs are saved. If None uses settings from config.ini or default's Python (stderr). :param log_level: minimum level of the logs being saved. If None uses settings from config.ini or DEFAULT_LOG_LEVEL. :param config: ConfigHandler to use (or creates a default one) """ self._original_log_file = log_file self._original_log_level = log_level self._config: ConfigHandler if config is None: self._config = ConfigHandler() else: self._config = config self._setup_logging() self._config.validate_config_or_fail() self._aiohttp_client_session: Optional[aiohttp.ClientSession] = None self._async_http_client = DebusineAsyncHttpClient(logging, self._config) # ThreadPool for Task.execute (e.g. Sbuild.exec) self._task_executor = ThreadPoolExecutor() # Task.execute (e.g. Sbuild.exec) future is stored here (avoids # garbage collection deletion) self._task_exec_future = None # Used by ThreadPoolExecutor to submit tasks to the main event loop self._main_event_loop = None # Running task self._task_running: Optional[Task] = None
def _setup_logging(self): """ Set logging configuration. Use the parameters passed to Worker.__init__() and self._config configuration. If the log file cannot be opened it aborts. """ log_file = self._original_log_file or self._config.log_file log_level = ( self._original_log_level or self._config.log_level or logging.getLevelName(self.DEFAULT_LOG_LEVEL) ) try: logging.basicConfig( format='%(asctime)s %(message)s', filename=log_file, level=log_level, force=True, ) except OSError as exc: self._fail(f'Cannot open {log_file}: {exc}') def _create_task_signal_int_term_handler(self, signum): return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self): """Run the worker.""" self._set_main_event_loop() for signum in [signal.SIGINT, signal.SIGTERM]: self._main_event_loop.add_signal_handler( signum, functools.partial( self._create_task_signal_int_term_handler, signum ), ) await self.connect() logging.info("debusine-worker lost connection with debusine-server") await self.close()
def _set_main_event_loop(self): self._main_event_loop = asyncio.get_running_loop() async def _signal_int_term_handler(self, signum): if self._task_running is not None: self._task_running.abort() self.log_forced_exit(signum) await self.close()
[docs] async def close(self): """Close the AioHTTP session.""" if self._aiohttp_client_session is not None: await self._aiohttp_client_session.close() await self._async_http_client.close()
@staticmethod def _log_server_error(status_code, body): logging.error( 'Could not register. Server HTTP status code: %s Body: %s\n', status_code, body, ) async def _register(self): """ Create a token, registers it to debusine, saves it locally. The worker will not receive any tasks until the debusine admin has approved the token. """ token = secrets.token_hex(32) register_path = '/api/1.0/worker/register/' data = {'token': token, 'fqdn': socket.getfqdn()} try: response = await self._async_http_client.post( register_path, json=data ) except aiohttp.client_exceptions.ClientResponseError as err: self._log_server_error(err.status, 'Not available') return False status, body = ( response.status, (await response.content.read()).decode('utf-8'), ) if status == HTTPCreated.status_code: self._config.write_token(token) return True else: self._log_server_error(status, body) return False @staticmethod async def _tenacity_sleep(delay): await asyncio.sleep(delay)
[docs] async def connect(self): """ Connect (registering if needed) to the debusine server. Uses the URL for debusine server from the configuration file. """ self._aiohttp_client_session = aiohttp.ClientSession() if not self._config.token: result = await self._register() if result is False: self._fail('Exiting...') debusine_ws_url = self._config.debusine_url.replace('http', 'ws', 1) worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/' @tenacity.retry( sleep=self._tenacity_sleep, wait=tenacity.wait_random(min=1, max=6), before=self._log_trying_to_reconnect, ) async def _do_wait_for_messages(): """Connect to the server and waits for commands.""" headers = {'token': self._config.token} try: async with self._aiohttp_client_session.ws_connect( worker_connect_ws_url, headers=headers, heartbeat=60 ) as ws: logging.info("Connected to %s", self._config.debusine_url) async for msg in ws: # pragma: no branch msg: WSMessage await self._process_message(msg) return True except TokenDisabledError as exc: # The server disconnect the worker because the token # is disabled. Debusine's server admin should enable # the token. logging.info( # noqa: G200 'The token (%s) is disabled. ' 'Debusine admin (%s) should enable it. ' 'Reason: %s. ' 'Will try again', self._config.token, self._config.debusine_url, exc, ) raise except ( aiohttp.client_exceptions.ClientConnectorError, aiohttp.client_exceptions.WSServerHandshakeError, ) as exc: logging.info( # noqa: G200 'Error connecting to %s (%s)', self._config.debusine_url, exc, ) raise except ConnectionResetError: logging.debug('ConnectionResetError, will reconnect') raise await _do_wait_for_messages()
@staticmethod def _log_trying_to_reconnect(retry_state): if retry_state.attempt_number > 1: logging.debug( 'Trying to reconnect, attempt %d', retry_state.attempt_number ) @staticmethod def _fail(message): logging.fatal(message) raise SystemExit(3) async def _request_and_execute_work_request(self): work_request = await self._request_work_request() if work_request: await self._execute_work_request_and_submit_result(work_request) else: logging.debug('No work request available') async def _process_message(self, msg): """ Process messages pushed by the debusine server to the client. Return True for processed messages; False for non-processed messages and it logs the reason. """ if not isinstance(msg, aiohttp.http_websocket.WSMessage): logging.debug( 'Worker._process_message: unexpected type: %s ' 'is not an instance of %s', type(msg), aiohttp.http_websocket.WSMessage, ) return False if msg.type == WSMsgType.TEXT: try: msg_content = json.loads(msg.data) logging.debug("Received from the server: '%s'", msg_content) except json.JSONDecodeError as exc: logging.info( # noqa: G200 'Worker._process_message: JSONDecodeError on a message ' 'received from the server: Received: "%s" Error: "%s"', msg.data, exc, ) return False if (text := msg_content.get('text')) is not None: if text == 'request_dynamic_metadata': await self._send_dynamic_metadata() elif text == 'work_request_available': if self._task_exec_future: logging.info( "Work request available but ignored: " "currently running a Task" ) else: await self._request_and_execute_work_request() else: logging.debug( 'Worker._process_message: invalid msg_content received ' 'from the server: %s', msg_content['text'], ) elif (reason_code := msg_content.get('reason_code')) is not None: if reason_code == 'TOKEN_DISABLED': reason = msg_content.get("reason", "Unknown") raise TokenDisabledError(reason) else: logging.error( 'Unknown reason code (ignoring it): "%s"', reason_code ) else: reason = msg_content.get('reason', 'unknown') logging.info("Could not connect. Reason: '%s'", reason) return False async def _request_work_request(self): """Request a work request and returns it.""" work_request = await self._async_http_client.get( '/api/1.0/work-request/get-next-for-worker/' ) if work_request.status == HTTPOk.status_code: try: work_request_obj = WorkRequest.parse_raw( await work_request.text() ) except ValidationError as exc: logging.warning( # noqa: G200 'Invalid WorkRequest received from' ' /get-next-for-worker/: %s', exc, ) return None return work_request_obj else: return None @staticmethod def _system_metadata(): """Return system metadata such as CPU count or physical memory.""" cpu_count = system_information.cpu_count() total_physical_memory = system_information.total_physical_memory() data = { 'system:cpu_count': cpu_count, 'system:total_physical_memory': total_physical_memory, } return data async def _send_dynamic_metadata(self): dynamic_metadata_path = '/api/1.0/worker/dynamic-metadata/' metadata = {} metadata.update(self._system_metadata()) metadata.update(Task.analyze_worker_all_tasks()) response = await self._async_http_client.put( dynamic_metadata_path, json=metadata ) logging.debug( 'Sent dynamic_metadata (response: %d) Dynamic metadata: %s', response.status, metadata, ) async def _execute_work_request_and_submit_result( self, work_request: WorkRequest ): update_work_completed_path = ( f'/api/1.0/work-request/{work_request.id}/completed/' ) self._task_running = Task.class_from_name(work_request.task_name)() self._task_running.configure_server_access( Debusine(self._config.debusine_url + "/api", self._config.token) ) try: self._task_running.configure(work_request.task_data) except TaskConfigError as exc: logging.error( # noqa: G200 'Task: %s Error configure: %s', self._task_running.name, exc ) body = {'result': 'error'} await self._async_http_client.put( update_work_completed_path, json=body ) return self._task_name = self._task_running.name self._task_exec_future = self._task_executor.submit( self._task_running.execute_logging_exceptions ) self._task_exec_future.add_done_callback( functools.partial( self._send_task_result, update_work_completed_path, ) ) self._task_exec_future.add_done_callback( self._task_exec_future_finished ) def _task_exec_future_finished(self, task_exec_future): # noqa: U100 self._task_exec_future = None self._task_running = None def _send_task_result( self, update_work_path: str, task_exec_future ) -> None: """ Send the result of the task to the debusine server. :param update_work_path: path to update the WorkRequest status :param task_exec_future: task_exec that executed the Task. This method check the result via .result() or .exception() of this object """ if self._task_running.aborted: logging.info("Task: %s has been aborted", self._task_name) # No need to notify debusine-server return elif task_exec_future.exception(): logging.error( # noqa: G200 'Task: %s Error execute: %s', self._task_name, task_exec_future.exception(), ) result = 'error' else: result = 'success' if task_exec_future.result() else 'failure' task_result_body = {'result': result} if self._main_event_loop.is_running() is False: logging.info( "Could not send Task.execute() result for %s. Event loop is " "not running", update_work_path, ) return # Update WorkRequest status asyncio.run_coroutine_threadsafe( self._async_http_client.put( update_work_path, json=task_result_body ), self._main_event_loop, )
[docs] @staticmethod def log_forced_exit(signum): """Log a forced exit.""" return logging.info('Terminated with signal %s', signum.name)