#!/usr/bin/env python3
# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Worker client: connects to debusine server.
Overview
--------
* Registration (needed only once per worker): If the worker doesn't have a
token: it will generate it and register with the server
(HTTP POST to ``/api/1.0/worker/register``)
* The client will use this token to connect to the server (WebSocket to
``/ws/1.0/worker/connect``)
Flow
----
#. The worker is executed and chooses ``~/.config/debusine/worker``
(if it exists) or ``/etc/debusine/worker``. It reads the file
``config.ini`` from the directory and if it already exists the file
``token``.
#. If there isn't a token the worker generates one (using
:py:func:`secrets.token_hex`) and registers it to the Debusine server
via HTTP POST to `/api/1.0/worker/register` sending the generated token
and the worker's FQDN. The token is saved to the ``token`` file in the
chosen config directory.
#. The server will create a new Token and Worker in the DB via the models.
They wouldn't be used until manual validation.
#. The client can then connect using WebSockets to ``/ws/1.0/worker/connect``
to wait for commands to execute.
Objects documentation
---------------------
"""
import asyncio
import functools
import json
import logging
import secrets
import signal
import socket
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
import aiohttp
from aiohttp import WSMessage, WSMsgType
from aiohttp.web_exceptions import HTTPCreated, HTTPOk
from pydantic import ValidationError
import tenacity
from debusine.client.models import WorkRequest
from debusine.tasks import Task, TaskConfigError
from . import system_information
from .config import ConfigHandler
from .debusine_async_http_client import DebusineAsyncHttpClient
from ..client.debusine import Debusine
from ..client.exceptions import TokenDisabledError
[docs]class Worker:
"""Worker class: waits for commands from the debusine server."""
DEFAULT_LOG_LEVEL = logging.INFO
[docs] def __init__(
self,
*,
log_file: str = None,
log_level=None,
config=None,
):
"""
Initialize Worker.
:param log_file: log file to where the logs are saved. If None uses
settings from config.ini or default's Python (stderr).
:param log_level: minimum level of the logs being saved. If None uses
settings from config.ini or DEFAULT_LOG_LEVEL.
:param config: ConfigHandler to use (or creates a default one)
"""
self._original_log_file = log_file
self._original_log_level = log_level
self._config: ConfigHandler
if config is None:
self._config = ConfigHandler()
else:
self._config = config
self._setup_logging()
self._config.validate_config_or_fail()
self._aiohttp_client_session: Optional[aiohttp.ClientSession] = None
self._async_http_client = DebusineAsyncHttpClient(logging, self._config)
# ThreadPool for Task.execute (e.g. Sbuild.exec)
self._task_executor = ThreadPoolExecutor()
# Task.execute (e.g. Sbuild.exec) future is stored here (avoids
# garbage collection deletion)
self._task_exec_future = None
# Used by ThreadPoolExecutor to submit tasks to the main event loop
self._main_event_loop = None
# Running task
self._task_running: Optional[Task] = None
def _setup_logging(self):
"""
Set logging configuration.
Use the parameters passed to Worker.__init__() and self._config
configuration.
If the log file cannot be opened it aborts.
"""
log_file = self._original_log_file or self._config.log_file
log_level = (
self._original_log_level
or self._config.log_level
or logging.getLevelName(self.DEFAULT_LOG_LEVEL)
)
try:
logging.basicConfig(
format='%(asctime)s %(message)s',
filename=log_file,
level=log_level,
force=True,
)
except OSError as exc:
self._fail(f'Cannot open {log_file}: {exc}')
def _create_task_signal_int_term_handler(self, signum):
return asyncio.create_task(self._signal_int_term_handler(signum))
[docs] async def main(self):
"""Run the worker."""
self._set_main_event_loop()
for signum in [signal.SIGINT, signal.SIGTERM]:
self._main_event_loop.add_signal_handler(
signum,
functools.partial(
self._create_task_signal_int_term_handler, signum
),
)
await self.connect()
logging.info("debusine-worker lost connection with debusine-server")
await self.close()
def _set_main_event_loop(self):
self._main_event_loop = asyncio.get_running_loop()
async def _signal_int_term_handler(self, signum):
if self._task_running is not None:
self._task_running.abort()
self.log_forced_exit(signum)
await self.close()
[docs] async def close(self):
"""Close the AioHTTP session."""
if self._aiohttp_client_session is not None:
await self._aiohttp_client_session.close()
await self._async_http_client.close()
@staticmethod
def _log_server_error(status_code, body):
logging.error(
'Could not register. Server HTTP status code: %s Body: %s\n',
status_code,
body,
)
async def _register(self):
"""
Create a token, registers it to debusine, saves it locally.
The worker will not receive any tasks until the debusine admin
has approved the token.
"""
token = secrets.token_hex(32)
register_path = '/api/1.0/worker/register/'
data = {'token': token, 'fqdn': socket.getfqdn()}
try:
response = await self._async_http_client.post(
register_path, json=data
)
except aiohttp.client_exceptions.ClientResponseError as err:
self._log_server_error(err.status, 'Not available')
return False
status, body = (
response.status,
(await response.content.read()).decode('utf-8'),
)
if status == HTTPCreated.status_code:
self._config.write_token(token)
return True
else:
self._log_server_error(status, body)
return False
@staticmethod
async def _tenacity_sleep(delay):
await asyncio.sleep(delay)
[docs] async def connect(self):
"""
Connect (registering if needed) to the debusine server.
Uses the URL for debusine server from the configuration file.
"""
self._aiohttp_client_session = aiohttp.ClientSession()
if not self._config.token:
result = await self._register()
if result is False:
self._fail('Exiting...')
debusine_ws_url = self._config.debusine_url.replace('http', 'ws', 1)
worker_connect_ws_url = f'{debusine_ws_url}/ws/1.0/worker/connect/'
@tenacity.retry(
sleep=self._tenacity_sleep,
wait=tenacity.wait_random(min=1, max=6),
before=self._log_trying_to_reconnect,
)
async def _do_wait_for_messages():
"""Connect to the server and waits for commands."""
headers = {'token': self._config.token}
try:
async with self._aiohttp_client_session.ws_connect(
worker_connect_ws_url, headers=headers, heartbeat=60
) as ws:
logging.info("Connected to %s", self._config.debusine_url)
async for msg in ws: # pragma: no branch
msg: WSMessage
await self._process_message(msg)
return True
except TokenDisabledError as exc:
# The server disconnect the worker because the token
# is disabled. Debusine's server admin should enable
# the token.
logging.info( # noqa: G200
'The token (%s) is disabled. '
'Debusine admin (%s) should enable it. '
'Reason: %s. '
'Will try again',
self._config.token,
self._config.debusine_url,
exc,
)
raise
except (
aiohttp.client_exceptions.ClientConnectorError,
aiohttp.client_exceptions.WSServerHandshakeError,
) as exc:
logging.info( # noqa: G200
'Error connecting to %s (%s)',
self._config.debusine_url,
exc,
)
raise
except ConnectionResetError:
logging.debug('ConnectionResetError, will reconnect')
raise
await _do_wait_for_messages()
@staticmethod
def _log_trying_to_reconnect(retry_state):
if retry_state.attempt_number > 1:
logging.debug(
'Trying to reconnect, attempt %d', retry_state.attempt_number
)
@staticmethod
def _fail(message):
logging.fatal(message)
raise SystemExit(3)
async def _request_and_execute_work_request(self):
work_request = await self._request_work_request()
if work_request:
await self._execute_work_request_and_submit_result(work_request)
else:
logging.debug('No work request available')
async def _process_message(self, msg):
"""
Process messages pushed by the debusine server to the client.
Return True for processed messages; False for non-processed messages
and it logs the reason.
"""
if not isinstance(msg, aiohttp.http_websocket.WSMessage):
logging.debug(
'Worker._process_message: unexpected type: %s '
'is not an instance of %s',
type(msg),
aiohttp.http_websocket.WSMessage,
)
return False
if msg.type == WSMsgType.TEXT:
try:
msg_content = json.loads(msg.data)
logging.debug("Received from the server: '%s'", msg_content)
except json.JSONDecodeError as exc:
logging.info( # noqa: G200
'Worker._process_message: JSONDecodeError on a message '
'received from the server: Received: "%s" Error: "%s"',
msg.data,
exc,
)
return False
if (text := msg_content.get('text')) is not None:
if text == 'request_dynamic_metadata':
await self._send_dynamic_metadata()
elif text == 'work_request_available':
if self._task_exec_future:
logging.info(
"Work request available but ignored: "
"currently running a Task"
)
else:
await self._request_and_execute_work_request()
else:
logging.debug(
'Worker._process_message: invalid msg_content received '
'from the server: %s',
msg_content['text'],
)
elif (reason_code := msg_content.get('reason_code')) is not None:
if reason_code == 'TOKEN_DISABLED':
reason = msg_content.get("reason", "Unknown")
raise TokenDisabledError(reason)
else:
logging.error(
'Unknown reason code (ignoring it): "%s"', reason_code
)
else:
reason = msg_content.get('reason', 'unknown')
logging.info("Could not connect. Reason: '%s'", reason)
return False
async def _request_work_request(self):
"""Request a work request and returns it."""
work_request = await self._async_http_client.get(
'/api/1.0/work-request/get-next-for-worker/'
)
if work_request.status == HTTPOk.status_code:
try:
work_request_obj = WorkRequest.parse_raw(
await work_request.text()
)
except ValidationError as exc:
logging.warning( # noqa: G200
'Invalid WorkRequest received from'
' /get-next-for-worker/: %s',
exc,
)
return None
return work_request_obj
else:
return None
@staticmethod
def _system_metadata():
"""Return system metadata such as CPU count or physical memory."""
cpu_count = system_information.cpu_count()
total_physical_memory = system_information.total_physical_memory()
data = {
'system:cpu_count': cpu_count,
'system:total_physical_memory': total_physical_memory,
}
return data
async def _send_dynamic_metadata(self):
dynamic_metadata_path = '/api/1.0/worker/dynamic-metadata/'
metadata = {}
metadata.update(self._system_metadata())
metadata.update(Task.analyze_worker_all_tasks())
response = await self._async_http_client.put(
dynamic_metadata_path, json=metadata
)
logging.debug(
'Sent dynamic_metadata (response: %d) Dynamic metadata: %s',
response.status,
metadata,
)
async def _execute_work_request_and_submit_result(
self, work_request: WorkRequest
):
update_work_completed_path = (
f'/api/1.0/work-request/{work_request.id}/completed/'
)
self._task_running = Task.class_from_name(work_request.task_name)()
self._task_running.configure_server_access(
Debusine(self._config.debusine_url + "/api", self._config.token)
)
try:
self._task_running.configure(work_request.task_data)
except TaskConfigError as exc:
logging.error( # noqa: G200
'Task: %s Error configure: %s', self._task_running.name, exc
)
body = {'result': 'error'}
await self._async_http_client.put(
update_work_completed_path, json=body
)
return
self._task_name = self._task_running.name
self._task_exec_future = self._task_executor.submit(
self._task_running.execute_logging_exceptions
)
self._task_exec_future.add_done_callback(
functools.partial(
self._send_task_result,
update_work_completed_path,
)
)
self._task_exec_future.add_done_callback(
self._task_exec_future_finished
)
def _task_exec_future_finished(self, task_exec_future): # noqa: U100
self._task_exec_future = None
self._task_running = None
def _send_task_result(
self, update_work_path: str, task_exec_future
) -> None:
"""
Send the result of the task to the debusine server.
:param update_work_path: path to update the WorkRequest status
:param task_exec_future: task_exec that executed the Task. This method
check the result via .result() or .exception() of this object
"""
if self._task_running.aborted:
logging.info("Task: %s has been aborted", self._task_name)
# No need to notify debusine-server
return
elif task_exec_future.exception():
logging.error( # noqa: G200
'Task: %s Error execute: %s',
self._task_name,
task_exec_future.exception(),
)
result = 'error'
else:
result = 'success' if task_exec_future.result() else 'failure'
task_result_body = {'result': result}
if self._main_event_loop.is_running() is False:
logging.info(
"Could not send Task.execute() result for %s. Event loop is "
"not running",
update_work_path,
)
return
# Update WorkRequest status
asyncio.run_coroutine_threadsafe(
self._async_http_client.put(
update_work_path, json=task_result_body
),
self._main_event_loop,
)
[docs] @staticmethod
def log_forced_exit(signum):
"""Log a forced exit."""
return logging.info('Terminated with signal %s', signum.name)