#!/usr/bin/env python3
# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Worker client: connects to debusine server.
Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
token: it will generate it and register with the server
(HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
``/api/ws/1.0/worker/connect``)
Flow
----
#. The worker is executed and chooses ``~/.config/debusine/worker``
(if it exists) or ``/etc/debusine/worker``. It reads the file
``config.ini`` from the directory and if it already exists the file
``token``.
#. If there isn't a token the worker generates one (using
:py:func:`secrets.token_hex`) and registers it to the Debusine server
via HTTP POST to `/api/1.0/worker/register` sending the generated token
and the worker's FQDN. The token is saved to the ``token`` file in the
chosen config directory.
#. The server will create a new Token and Worker in the DB via the models.
They wouldn't be used until manual validation.
#. The client can then connect using WebSockets to
``/api/ws/1.0/worker/connect`` and wait for commands to execute.
Objects documentation
---------------------
"""
import asyncio
import functools
import logging
import secrets
import signal
import socket
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
import aiohttp
from aiohttp.web_exceptions import HTTPCreated, HTTPOk
from pydantic import ValidationError
import tenacity
from debusine.client.models import WorkRequestResponse
from debusine.tasks import Task, TaskConfigError
from . import system_information
from .config import ConfigHandler
from .debusine_async_http_client import DebusineAsyncHttpClient
from ..client.debusine import Debusine
from ..client.exceptions import TokenDisabledError
[docs]class Worker:
"""Worker class: waits for commands from the debusine server."""
DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__(
self,
*,
log_file: str = None,
log_level=None,
config=None,
):
"""
Initialize Worker.
:param log_file: log file to where the logs are saved. If None uses
settings from config.ini or default's Python (stderr).
:param log_level: minimum level of the logs being saved. If None uses
settings from config.ini or DEFAULT_LOG_LEVEL.
:param config: ConfigHandler to use (or creates a default one)
"""
self._original_log_file = log_file
self._original_log_level = log_level
self._config: ConfigHandler
if config is None:
self._config = ConfigHandler()
else:
self._config = config
self._setup_logging()
self._config.validate_config_or_fail()
self._aiohttp_client_session: Optional[aiohttp.ClientSession] = None
self._async_http_client = DebusineAsyncHttpClient(logging, self._config)
logger = logging.getLogger()
self._debusine = Debusine(
self._config.api_url, self._config.token, logger=logger
)
# ThreadPool for Task.execute (e.g. Sbuild.exec)
self._task_executor = ThreadPoolExecutor()
# Task.execute (e.g. Sbuild.exec) future is stored here (avoids
# garbage collection deletion)
self._task_exec_future = None
# Used by ThreadPoolExecutor to submit tasks to the main event loop
self._main_event_loop = None
# Running task
self._task_running: Optional[Task] = None
def _setup_logging(self):
"""
Set logging configuration.
Use the parameters passed to Worker.__init__() and self._config
configuration.
If the log file cannot be opened it aborts.
"""
log_file = self._original_log_file or self._config.log_file
log_level = (
self._original_log_level
or self._config.log_level
or logging.getLevelName(self.DEFAULT_LOG_LEVEL)
)
try:
logging.basicConfig(
format='%(asctime)s %(message)s',
filename=log_file,
level=log_level,
force=True,
)
except OSError as exc:
self._fail(f'Cannot open {log_file}: {exc}')
def _create_task_signal_int_term_handler(self, signum):
return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self):
"""Run the worker."""
self._set_main_event_loop()
for signum in [signal.SIGINT, signal.SIGTERM]:
self._main_event_loop.add_signal_handler(
signum,
functools.partial(
self._create_task_signal_int_term_handler, signum
),
)
await self.connect()
logging.info("debusine-worker lost connection with debusine-server")
await self.close()
def _set_main_event_loop(self):
self._main_event_loop = asyncio.get_running_loop()
async def _signal_int_term_handler(self, signum):
if self._task_running is not None:
self._task_running.abort()
self.log_forced_exit(signum)
await self.close()
[docs] async def close(self):
"""Close the AioHTTP session."""
if self._aiohttp_client_session is not None:
await self._aiohttp_client_session.close()
await self._async_http_client.close()
@staticmethod
def _log_server_error(status_code, body):
logging.error(
'Could not register. Server HTTP status code: %s Body: %s\n',
status_code,
body,
)
async def _register(self):
"""
Create a token, registers it to debusine, saves it locally.
The worker will not receive any tasks until the debusine admin
has approved the token.
"""
token = secrets.token_hex(32)
register_path = '/1.0/worker/register/'
data = {'token': token, 'fqdn': socket.getfqdn()}
try:
response = await self._async_http_client.post(
register_path, json=data
)
except aiohttp.client_exceptions.ClientResponseError as err:
self._log_server_error(err.status, 'Not available')
return False
except aiohttp.client_exceptions.ClientConnectorError as err:
logging.error( # noqa: G200
'Could not register. Server unreachable: %s', err
)
return False
status, body = (
response.status,
(await response.content.read()).decode('utf-8'),
)
if status == HTTPCreated.status_code:
self._config.write_token(token)
return True
else:
self._log_server_error(status, body)
return False
@staticmethod
async def _tenacity_sleep(delay):
await asyncio.sleep(delay)
[docs] async def connect(self):
"""
Connect (registering if needed) to the debusine server.
Uses the URL for debusine server from the configuration file.
"""
self._aiohttp_client_session = aiohttp.ClientSession()
if not self._config.token:
result = await self._register()
if result is False:
self._fail('Exiting...')
debusine_ws_url = self._config.api_url.replace('http', 'ws', 1)
worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/'
@tenacity.retry(
sleep=self._tenacity_sleep,
wait=tenacity.wait_random(min=1, max=6),
before=self._log_trying_to_reconnect,
retry=tenacity.retry_if_exception_type(
aiohttp.client_exceptions.ClientError
)
| tenacity.retry_if_exception_type(TokenDisabledError)
| tenacity.retry_if_exception_type(ConnectionError),
)
async def _do_wait_for_messages():
"""Connect to the server and waits for commands."""
headers = {'token': self._config.token}
try:
async with self._aiohttp_client_session.ws_connect(
worker_connect_ws_url, headers=headers, heartbeat=60
) as ws:
logging.info("Connected to %s", self._config.api_url)
async for msg in ws: # pragma: no branch
msg: aiohttp.WSMessage
await self._process_message(msg)
return True
except TokenDisabledError as exc:
# The server disconnect the worker because the token
# is disabled. Debusine's server admin should enable
# the token.
logging.info( # noqa: G200
'The token (%s) is disabled. '
'Debusine admin (%s) should enable it. '
'Reason: %s. '
'Will try again',
self._config.token,
self._config.api_url,
exc,
)
raise
except (
aiohttp.client_exceptions.ClientConnectorError,
aiohttp.client_exceptions.WSServerHandshakeError,
) as exc:
logging.info( # noqa: G200
'Error connecting to %s (%s)',
self._config.api_url,
exc,
)
raise
except ConnectionResetError:
logging.debug('ConnectionResetError, will reconnect')
raise
await _do_wait_for_messages()
@staticmethod
def _log_trying_to_reconnect(retry_state):
if retry_state.attempt_number > 1:
logging.debug(
'Trying to reconnect, attempt %d', retry_state.attempt_number
)
@staticmethod
def _fail(message):
logging.fatal(message)
raise SystemExit(3)
async def _request_and_execute_work_request(self):
work_request = await self._request_work_request()
if work_request:
await self._execute_work_request_and_submit_result(work_request)
else:
logging.debug('No work request available')
async def _process_message(self, msg):
"""
Process messages pushed by the debusine server to the client.
Return True for processed messages; False for non-processed messages
and it logs the reason.
"""
async def work_request_available(msg_content: dict): # noqa: U100
if self._task_exec_future:
logging.info(
"Work request available but ignored: "
"currently running a Task"
)
else:
await self._request_and_execute_work_request()
return await Debusine.process_async_message(
msg,
{
"request_dynamic_metadata": self._send_dynamic_metadata,
"work_request_available": work_request_available,
},
logger=logging,
)
async def _request_work_request(self) -> Optional[WorkRequestResponse]:
"""Request a work request and returns it."""
work_request = await self._async_http_client.get(
'/1.0/work-request/get-next-for-worker/'
)
if work_request.status == HTTPOk.status_code:
try:
work_request_obj = WorkRequestResponse.parse_raw(
await work_request.text()
)
except ValidationError as exc:
logging.warning( # noqa: G200
'Invalid WorkRequest received from'
' /get-next-for-worker/: %s',
exc,
)
return None
return work_request_obj
else:
return None
@staticmethod
def _system_metadata():
"""Return system metadata such as CPU count or physical memory."""
cpu_count = system_information.cpu_count()
total_physical_memory = system_information.total_physical_memory()
data = {
'system:cpu_count': cpu_count,
'system:total_physical_memory': total_physical_memory,
}
return data
async def _send_dynamic_metadata(self, msg_content: dict): # noqa: U100
dynamic_metadata_path = '/1.0/worker/dynamic-metadata/'
metadata = {}
metadata.update(self._system_metadata())
metadata.update(Task.analyze_worker_all_tasks())
response = await self._async_http_client.put(
dynamic_metadata_path, json=metadata
)
logging.debug(
'Sent dynamic_metadata (response: %d) Dynamic metadata: %s',
response.status,
metadata,
)
async def _execute_work_request_and_submit_result(
self, work_request: WorkRequestResponse
):
self._task_running = Task.class_from_name(work_request.task_name)()
logger = logging.getLogger()
self._task_running.configure_server_access(
Debusine(self._config.api_url, self._config.token, logger=logger)
)
self._task_running.work_request = work_request.id
try:
self._task_running.configure(work_request.task_data)
except TaskConfigError as exc:
logging.error( # noqa: G200
'Task: %s Error configure: %s', self._task_running.name, exc
)
self._debusine.work_request_completed_update(
work_request.id, "error"
)
return
self._task_name = self._task_running.name
self._task_exec_future = self._task_executor.submit(
self._task_running.execute_logging_exceptions
)
self._task_exec_future.add_done_callback(
functools.partial(
self._send_task_result,
work_request.id,
)
)
self._task_exec_future.add_done_callback(
self._task_exec_future_finished
)
def _task_exec_future_finished(self, task_exec_future): # noqa: U100
self._task_exec_future = None
self._task_running = None
def _send_task_result(self, work_request_id: int, task_exec_future) -> None:
"""
Send the result of the task to the debusine server.
:param work_request_id: WorkRequest.id
:param task_exec_future: task_exec that executed the Task. This method
check the result via .result() or .exception() of this object
"""
if self._task_running.aborted:
logging.info("Task: %s has been aborted", self._task_name)
# No need to notify debusine-server
return
elif task_exec_future.exception():
logging.error( # noqa: G200
'Task: %s Error execute: %s',
self._task_name,
task_exec_future.exception(),
)
result = 'error'
else:
result = 'success' if task_exec_future.result() else 'failure'
self._debusine.work_request_completed_update(work_request_id, result)
[docs] @staticmethod
def log_forced_exit(signum):
"""Log a forced exit."""
return logging.info('Terminated with signal %s', signum.name)