Source code for debusine.worker._worker

#!/usr/bin/env python3

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Worker client: connects to debusine server.

Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
  token: it will generate it and register with the server
  (HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
  ``/ws/1.0/worker/connect``)

Flow
----

  #. The worker is executed and chooses ``~/.config/debusine/worker``
     (if it exists) or ``/etc/debusine/worker``. It reads the file
     ``config.ini`` from the directory and if it already exists the file
     ``token``.
  #. If there isn't a token the worker generates one (using
     :py:func:`secrets.token_hex`) and registers it to the Debusine server
     via HTTP POST to `/api/1.0/worker/register` sending the generated token
     and the worker's FQDN. The token is saved to the ``token`` file in the
     chosen config directory.
  #. The server will create a new Token and Worker in the DB via the models.
     They wouldn't be used until manual validation.
  #. The client can then connect using WebSockets to ``/ws/1.0/worker/connect``
     to wait for commands to execute.

Objects documentation
---------------------

"""
import asyncio
import functools
import json
import logging
import secrets
import signal
import socket
from concurrent.futures import ThreadPoolExecutor

import aiohttp
from aiohttp import WSMessage, WSMsgType
from aiohttp.web_exceptions import HTTPCreated, HTTPOk

from pydantic import ValidationError

import tenacity

from debusine.client.models import WorkRequest
from debusine.tasks import Task, TaskConfigError

from . import system_information
from .config import ConfigHandler
from ..client.exceptions import TokenDisabledError


[docs]class Worker: """Worker class: waits for commands from the debusine server.""" DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__( self, *, log_file: str = None, log_level=None, config=None, ): """ Initialize Worker. :param log_file: log file to where the logs are saved. If None uses settings from config.ini or default's Python (stderr). :param log_level: minimum level of the logs being saved. If None uses settings from config.ini or DEFAULT_LOG_LEVEL. :param config: ConfigHandler to use (or creates a default one) """ self._original_log_file = log_file self._original_log_level = log_level self._config: ConfigHandler if config is None: self._config = ConfigHandler() else: self._config = config self._setup_logging() self._config.validate_config_or_fail() self._aiohttp_client_session = None # ThreadPool for Task.execute (e.g. Sbuild.exec) self._task_executor = ThreadPoolExecutor() # Task.execute (e.g. Sbuild.exec) future is stored here (avoids # garbage collection deletion) self._task_exec_future = None # Used by ThreadPoolExecutor to submit tasks to the main event loop self._main_event_loop = None # Running task self._task_running = None
def _setup_logging(self): """ Set logging configuration. Use the parameters passed to Worker.__init__() and self._config configuration. If the log file cannot be opened it aborts. """ log_file = self._original_log_file or self._config.log_file log_level = ( self._original_log_level or self._config.log_level or logging.getLevelName(self.DEFAULT_LOG_LEVEL) ) try: logging.basicConfig( format='%(asctime)s %(message)s', filename=log_file, level=log_level, force=True, ) except OSError as exc: self._fail(f'Cannot open {log_file}: {exc}') def _ensure_has_session(self): if self._aiohttp_client_session is None: self._aiohttp_client_session = aiohttp.ClientSession() def _create_task_signal_int_term_handler(self, signum): return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self): """Run the worker.""" self._set_main_event_loop() for signum in [signal.SIGINT, signal.SIGTERM]: self._main_event_loop.add_signal_handler( signum, functools.partial( self._create_task_signal_int_term_handler, signum ), ) await self.connect() logging.info("debusine-worker lost connection with debusine-server") await self.close()
def _set_main_event_loop(self): self._main_event_loop = asyncio.get_running_loop() async def _signal_int_term_handler(self, signum): if self._task_running is not None: self._task_running.abort() self.log_forced_exit(signum) await self.close()
[docs] async def close(self): """Close the AioHTTP session.""" if self._aiohttp_client_session is not None: await self._aiohttp_client_session.close()
@staticmethod def _log_server_error(status_code, body): logging.error( 'Could not register. Server HTTP status code: %s Body: %s\n', status_code, body, ) def _make_http_request(self, method, path, *args, **kwargs): """ Make an HTTP request to the debusine server. Include the token if available in self._config.token. """ self._ensure_has_session() method_to_func = { 'PUT': self._aiohttp_client_session.put, 'POST': self._aiohttp_client_session.post, 'GET': self._aiohttp_client_session.get, } if method not in method_to_func: allowed_methods = ", ".join(method_to_func.keys()) raise ValueError(f'Method must be one of: {allowed_methods}') if method == 'GET' and 'data' in kwargs: raise ValueError('data not allowed in GET requests') if not path.startswith('/'): raise ValueError('Path must be absolute') headers = kwargs.pop('headers', {}) if self._config.token is not None: headers['token'] = self._config.token url = f'{self._config.debusine_url}{path}' logging.debug( "HTTP %s to the server: URL: %s %s %s", method, url, args, kwargs ) return method_to_func[method](url, *args, **kwargs, headers=headers) def _get(self, path, *args, **kwargs): return self._make_http_request('GET', path, *args, **kwargs) def _post(self, path, *args, **kwargs): return self._make_http_request('POST', path, *args, **kwargs) def _put(self, path, *args, **kwargs): return self._make_http_request('PUT', path, *args, **kwargs) async def _register(self): """ Create a token, registers it to debusine, saves it locally. The worker will not receive any tasks until the debusine admin has approved the token. """ token = secrets.token_hex(32) register_path = '/api/1.0/worker/register/' data = {'token': token, 'fqdn': socket.getfqdn()} try: response = await self._post(register_path, json=data) except aiohttp.client_exceptions.ClientResponseError as err: self._log_server_error(err.status, 'Not available') return False status, body = ( response.status, (await response.content.read()).decode('utf-8'), ) if status == HTTPCreated.status_code: self._config.write_token(token) return True else: self._log_server_error(status, body) return False @staticmethod async def _tenacity_sleep(delay): await asyncio.sleep(delay)
[docs] async def connect(self): """ Connect (registering if needed) to the debusine server. Uses the URL for debusine server from the configuration file. """ self._ensure_has_session() if not self._config.token: result = await self._register() if result is False: self._fail('Exiting...') debusine_ws_url = self._config.debusine_url.replace('http', 'ws', 1) worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/' @tenacity.retry( sleep=self._tenacity_sleep, wait=tenacity.wait_random(min=1, max=6), before=self._log_trying_to_reconnect, ) async def _do_wait_for_messages(): """Connect to the server and waits for commands.""" headers = {'token': self._config.token} try: async with self._aiohttp_client_session.ws_connect( worker_connect_ws_url, headers=headers, heartbeat=60 ) as ws: logging.info("Connected to %s", self._config.debusine_url) async for msg in ws: # pragma: no branch msg: WSMessage await self._process_message(msg) return True except TokenDisabledError as exc: # The server disconnect the worker because the token # is disabled. Debusine's server admin should enable # the token. logging.info( # noqa: G200 'The token (%s) is disabled. ' 'Debusine admin (%s) should enable it. ' 'Reason: %s. ' 'Will try again', self._config.token, self._config.debusine_url, exc, ) raise except ( aiohttp.client_exceptions.ClientConnectorError, aiohttp.client_exceptions.WSServerHandshakeError, ) as exc: logging.info( # noqa: G200 'Error connecting to %s (%s)', self._config.debusine_url, exc, ) raise except ConnectionResetError: logging.debug('ConnectionResetError, will reconnect') raise await _do_wait_for_messages()
@staticmethod def _log_trying_to_reconnect(retry_state): if retry_state.attempt_number > 1: logging.debug( 'Trying to reconnect, attempt %d', retry_state.attempt_number ) @staticmethod def _fail(message): logging.fatal(message) raise SystemExit(3) async def _request_and_execute_work_request(self): work_request = await self._request_work_request() if work_request: await self._execute_work_request_and_submit_result(work_request) else: logging.debug('No work request available') async def _process_message(self, msg): """ Process messages pushed by the debusine server to the client. Return True for processed messages; False for non-processed messages and it logs the reason. """ if not isinstance(msg, aiohttp.http_websocket.WSMessage): logging.debug( 'Worker._process_message: unexpected type: %s ' 'is not an instance of %s', type(msg), aiohttp.http_websocket.WSMessage, ) return False if msg.type == WSMsgType.TEXT: try: msg_content = json.loads(msg.data) logging.debug("Received from the server: '%s'", msg_content) except json.JSONDecodeError as exc: logging.info( # noqa: G200 'Worker._process_message: JSONDecodeError on a message ' 'received from the server: Received: "%s" Error: "%s"', msg.data, exc, ) return False if (text := msg_content.get('text')) is not None: if text == 'request_dynamic_metadata': await self._send_dynamic_metadata() elif text == 'work_request_available': if self._task_exec_future: logging.info( "Work request available but ignored: " "currently running a Task" ) else: await self._request_and_execute_work_request() else: logging.debug( 'Worker._process_message: invalid msg_content received ' 'from the server: %s', msg_content['text'], ) elif (reason_code := msg_content.get('reason_code')) is not None: if reason_code == 'TOKEN_DISABLED': reason = msg_content.get("reason", "Unknown") raise TokenDisabledError(reason) else: logging.error( 'Unknown reason code (ignoring it): "%s"', reason_code ) else: reason = msg_content.get('reason', 'unknown') logging.info("Could not connect. Reason: '%s'", reason) return False async def _request_work_request(self): """Request a work request and returns it.""" work_request = await self._get( '/api/1.0/work-request/get-next-for-worker/' ) if work_request.status == HTTPOk.status_code: try: work_request_obj = WorkRequest.parse_raw( await work_request.text() ) except ValidationError as exc: logging.warning( # noqa: G200 'Invalid WorkRequest received from' ' /get-next-for-worker/: %s', exc, ) return None return work_request_obj else: return None @staticmethod def _system_metadata(): """Return system metadata such as CPU count or physical memory.""" cpu_count = system_information.cpu_count() total_physical_memory = system_information.total_physical_memory() data = { 'system:cpu_count': cpu_count, 'system:total_physical_memory': total_physical_memory, } return data async def _send_dynamic_metadata(self): dynamic_metadata_path = '/api/1.0/worker/dynamic-metadata/' metadata = {} metadata.update(self._system_metadata()) metadata.update(Task.analyze_worker_all_tasks()) response = await self._put(dynamic_metadata_path, json=metadata) logging.debug( 'Sent dynamic_metadata (response: %d) Dynamic metadata: %s', response.status, metadata, ) async def _execute_work_request_and_submit_result( self, work_request: WorkRequest ): update_work_completed_path = ( f'/api/1.0/work-request/{work_request.id}/completed/' ) self._task_running = Task.class_from_name(work_request.task_name)() try: self._task_running.configure(work_request.task_data) except TaskConfigError as exc: logging.error( # noqa: G200 'Task: %s Error configure: %s', self._task_running.name, exc ) body = {'result': 'error'} await self._put(update_work_completed_path, json=body) return self._task_name = self._task_running.name self._task_exec_future = self._task_executor.submit( self._task_running.execute ) self._task_exec_future.add_done_callback( functools.partial( self._send_task_result, update_work_completed_path ) ) self._task_exec_future.add_done_callback( self._task_exec_future_finished ) def _task_exec_future_finished(self, task_exec_future): # noqa: U100 self._task_exec_future = None self._task_running = None def _send_task_result(self, url, task_exec_future): if self._task_running.aborted: logging.info("Task: %s has been aborted", self._task_name) # No need to notify debusine-server return elif task_exec_future.exception(): logging.error( # noqa: G200 'Task: %s Error execute: %s', self._task_name, task_exec_future.exception(), ) result = 'error' else: result = 'success' if task_exec_future.result() else 'failure' body = {'result': result} if self._main_event_loop.is_running() is False: logging.info( "Could not send Task.execute() result for %s. Event loop is " "not running", url, ) return asyncio.run_coroutine_threadsafe( self._put(url, json=body), self._main_event_loop, )
[docs] @staticmethod def log_forced_exit(signum): """Log a forced exit.""" return logging.info('Terminated with signal %s', signum.name)