Source code for debusine.client.debusine

#!/usr/bin/env python3

# Copyright 2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Debusine: Interacts with debusine server.

Debusine fetches information, submit work requests and other operations.
"""
import asyncio
import datetime
import functools
import io
import json
import logging
import shutil
import subprocess
import tempfile
import urllib
from collections.abc import Iterable
from pathlib import Path
from typing import Any, Literal, Protocol, TYPE_CHECKING, overload
from urllib.parse import urlencode

import aiohttp

import requests

from requests_toolbelt.downloadutils.stream import stream_response_to_file

import tenacity

from debusine.artifacts import LocalArtifact
from debusine.client.debusine_http_client import DebusineHttpClient
from debusine.client.exceptions import TokenDisabledError
from debusine.client.file_uploader import FileUploader
from debusine.client.models import (
    ArtifactResponse,
    CollectionItemType,
    LookupMultipleRequest,
    LookupMultipleResponse,
    LookupSingleRequest,
    LookupSingleResponse,
    LookupSingleResponseArtifact,
    LookupSingleResponseCollection,
    OnWorkRequestCompleted,
    RelationCreateRequest,
    RelationCreateRequestType,
    RelationResponse,
    RemoteArtifact,
    WorkRequestRequest,
    WorkRequestResponse,
    WorkflowTemplateRequest,
    WorkflowTemplateResponse,
)

if TYPE_CHECKING:
    from debusine.tasks.models import (
        CollectionItemLookupMultiple,
        CollectionItemLookupSingle,
    )


class _MessageProcessor(Protocol):
    """A callable that processes a message from the server."""

    def __call__(self, *, msg_content: dict[str, Any]) -> None: ...


[docs]class Debusine: """Class to interact with debusine server.""" API_VERSION = '1.0'
[docs] def __init__( self, base_api_url: str, api_token: str | None = None, *, logger: logging.Logger, ): """ Initialize client. :param base_api_url: URL for API endpoint (e.g. http://localhost/api) :param api_token: optional token to be used for the calls. """ self.base_api_url: str = base_api_url self.token: str | None = api_token self.api_url = base_api_url.rstrip("/") + "/" + self.API_VERSION self._logger = logger self._debusine_http_client: DebusineHttpClient = DebusineHttpClient( self.api_url, api_token )
[docs] def work_request_iter(self) -> Iterable[WorkRequestResponse]: """ List all WorkRequests. :raises many: see _api_request method documentation. """ return self._debusine_http_client.iter_paginated_get( "/work-request/", WorkRequestResponse )
[docs] def work_request_get(self, work_request_id: int) -> WorkRequestResponse: """ Get WorkRequest for work_request_id. :param work_request_id: id to fetch the status of. :raises many: see _api_request method documentation. """ return self._debusine_http_client.get( f"/work-request/{work_request_id}/", WorkRequestResponse )
[docs] def work_request_update( self, work_request_id: int, *, priority_adjustment: int ) -> None: """Update properties of work_request_id.""" return self._debusine_http_client.patch( path=f"/work-request/{work_request_id}/", data={"priority_adjustment": priority_adjustment}, expected_class=WorkRequestResponse, )
[docs] @staticmethod def work_request_completed_path(work_request_id: int) -> str: """Return path to update the completed result for work_request_id.""" return f"/work-request/{work_request_id}/completed/"
[docs] def work_request_completed_update(self, work_request_id: int, result: str): """Update work_request_id as completed with result.""" return self._debusine_http_client.put( path=self.work_request_completed_path(work_request_id), data={"result": result}, expected_class=None, )
[docs] def work_request_create( self, work_request: WorkRequestRequest ) -> WorkRequestResponse: """ Create a work request (via POST /work-request/). :return: WorkRequest returned by the server. :raises: see _api_request method documentation. """ data = work_request.dict() # If workspace is None: the client does not want to specify # a workspace. Do not send it to the server: the server will # assign the default one if data["workspace"] is None: del data["workspace"] return self._debusine_http_client.post( "/work-request/", WorkRequestResponse, data=data, )
[docs] def workflow_template_create( self, workflow_template: WorkflowTemplateRequest ) -> WorkflowTemplateResponse: """ Create a workflow template (via POST /workflow-template/). :return: WorkflowTemplate returned by the server. :raises: see _api_request method documentation. """ data = workflow_template.dict() # If workspace is None: the client does not want to specify a # workspace. Do not send it to the server: the server will assign # the default one. if data["workspace"] is None: del data["workspace"] return self._debusine_http_client.post( "/workflow-template/", WorkflowTemplateResponse, data=data, expected_statuses=[requests.codes.created], )
[docs] def artifact_get(self, artifact_id: int) -> ArtifactResponse: """ Get artifact information. Use download_artifact() to download the artifact. """ return self._debusine_http_client.get( f"/artifact/{artifact_id}/", ArtifactResponse, )
[docs] def artifact_create( self, artifact: LocalArtifact[Any], *, workspace: str | None, work_request: int | None = None, expire_at: datetime.datetime | None = None, ) -> ArtifactResponse: """Create artifact in the debusine server.""" return self._debusine_http_client.post( "/artifact/", ArtifactResponse, data=artifact.serialize_for_create_artifact( workspace=workspace, work_request=work_request, expire_at=expire_at, ), expected_statuses=[requests.codes.created], )
def _url_for_file_in_artifact( self, artifact_id: int, path_in_artifact: str ) -> str: """Return URL to upload a file in the artifact.""" quoted_path = urllib.parse.quote(path_in_artifact) return ( f"{self.base_api_url}/{self.API_VERSION}/artifact/" f"{artifact_id}/files/{quoted_path}/" )
[docs] def upload_files( self, artifact_id: int, upload_files: dict[str, Path], base_directory: Path | None = None, ) -> None: """ Upload into artifact the files. :param artifact_id: artifact_id to upload files to. :param upload_files: list of files to upload. :param base_directory: base directory for relative path's files to upload. """ file_uploader = FileUploader(self.token) for artifact_path, local_path in upload_files.items(): if not local_path.is_absolute(): if base_directory is None: raise ValueError( f"{local_path} is relative: base_directory " "parameter cannot be None" ) local_path = base_directory.joinpath(local_path) url = self._url_for_file_in_artifact(artifact_id, artifact_path) file_uploader.upload(local_path, url)
[docs] def upload_artifact( self, local_artifact: LocalArtifact[Any], *, workspace: str | None, work_request: int | None = None, expire_at: datetime.datetime | None = None, ) -> RemoteArtifact: """Upload (create and upload files) the local_artifact to the server.""" artifact_response = self.artifact_create( local_artifact, workspace=workspace, work_request=work_request, expire_at=expire_at, ) files_to_upload: dict[str, Path] = {} for file_path_to_upload in artifact_response.files_to_upload: files_to_upload[file_path_to_upload] = local_artifact.files[ file_path_to_upload ] self.upload_files( artifact_response.id, files_to_upload, ) return RemoteArtifact( id=artifact_response.id, workspace=artifact_response.workspace )
[docs] def relation_create( self, artifact_id: int, target_id: int, relation_type: RelationCreateRequestType, ) -> RelationResponse: """ Create a new relation between artifacts. :param artifact_id: relation from :param target_id: relation to :param relation_type: type of relation such as extends, relates-to, built-using :return: True if the relation already existed/has been created, False if it could not be created """ relation_request = RelationCreateRequest( artifact=artifact_id, target=target_id, type=relation_type ) return self._debusine_http_client.post( "/artifact-relation/", RelationResponse, data=relation_request.dict(include={"artifact", "target", "type"}), expected_statuses=[requests.codes.ok, requests.codes.created], )
@staticmethod def _get_fileobj_streaming(url: str, *, headers: dict[str, str | None]): # Implemented to mock it in unit tests (since responses does not support # mocking when stream=True) # TODO: Simplify to just requests.get once # https://github.com/python/typeshed/pull/11370 has landed. with requests.Session() as session: return session.get(url, headers=headers, stream=True)
[docs] def download_artifact( self, artifact_id: int, destination: Path, *, tarball: bool = False, ) -> ArtifactResponse: """ Download artifact_id into destination directory. :param artifact_id: artifact id to download :param destination: destination directory to download/uncompress :param tarball: True to only download the tarball (artifact-id.tar.gz), False to uncompress it """ artifact = self._debusine_http_client.get( f"/artifact/{artifact_id}", ArtifactResponse, ) url = artifact.download_tar_gz_url # Accept-Encoding: None: debusine is downloading a .tar.gz: no need # to gzip/deflate before sending it. Also: the implementation is # using request's response.raw which does not transparently # inflate/gunzip. headers = {"Token": self.token, "Accept-Encoding": None} raw_response = self._get_fileobj_streaming(url, headers=headers).raw if tarball: destination_file = destination / f"artifact-{artifact_id}.tar.gz" with open(destination_file, "wb") as fileobj: shutil.copyfileobj(raw_response, fileobj) self._logger.info("Artifact downloaded: %s", destination_file) else: self._logger.info( "Downloading artifact and uncompressing into %s", destination ) self._uncompress(raw_response, destination, url) return artifact
def _uncompress( self, raw_response: io.BytesIO, destination: Path, url: str, ) -> None: with ( tempfile.TemporaryFile() as stderr_file, tempfile.TemporaryFile() as list_file, ): proc = subprocess.Popen( ["tar", "-xzv"], stdin=subprocess.PIPE, stderr=stderr_file, stdout=list_file, cwd=destination, ) assert proc.stdin is not None # https://github.com/python/mypy/issues/14943 shutil.copyfileobj(raw_response, proc.stdin) # type: ignore proc.stdin.close() returncode = proc.wait() list_file.seek(0) self._logger.info( list_file.read().decode(errors="ignore").rstrip("\n") ) if returncode != 0: stderr_file.seek(0) stderr_contents = stderr_file.read().decode(errors="ignore") raise RuntimeError( f"Error untarring {url}. " f"Returncode: {returncode} stderr:\n{stderr_contents}" )
[docs] def download_artifact_file( self, artifact_id: int, path_in_artifact: str, destination: Path, ) -> ArtifactResponse: """ Download artifact_id into destination directory. :param artifact_id: artifact id to download :param path_in_artifact: download the file from the artifact with this name :param destination: destination file to download to """ artifact = self._debusine_http_client.get( f"/artifact/{artifact_id}", ArtifactResponse, ) if path_in_artifact not in artifact.files: raise ValueError( f"No file '{path_in_artifact}' in artifact {artifact_id}" ) url = artifact.files[path_in_artifact].url headers = {"Token": self.token} response = self._get_fileobj_streaming(url, headers=headers) stream_response_to_file(response, destination) self._logger.info("Artifact file downloaded: %s", destination) return artifact
@overload def lookup_single( self, lookup: "CollectionItemLookupSingle", work_request: int, expect_type: Literal[CollectionItemType.ARTIFACT], default_category: str | None = None, ) -> LookupSingleResponseArtifact: ... @overload def lookup_single( self, lookup: "CollectionItemLookupSingle", work_request: int, expect_type: Literal[CollectionItemType.COLLECTION], default_category: str | None = None, ) -> LookupSingleResponseCollection: ... @overload def lookup_single( self, lookup: "CollectionItemLookupSingle", work_request: int, expect_type: CollectionItemType, default_category: str | None = None, ) -> LookupSingleResponse: ...
[docs] def lookup_single( self, lookup: "CollectionItemLookupSingle", work_request: int, expect_type: CollectionItemType, default_category: str | None = None, ) -> LookupSingleResponse: """Look up a single collection item.""" request = LookupSingleRequest( lookup=lookup, work_request=work_request, expect_type=expect_type, default_category=default_category, ) return self._debusine_http_client.post( "/lookup/single/", LookupSingleResponse, data=request.dict() )
@overload def lookup_multiple( self, lookup: "CollectionItemLookupMultiple", work_request: int, expect_type: Literal[CollectionItemType.ARTIFACT], default_category: str | None = None, ) -> LookupMultipleResponse[LookupSingleResponseArtifact]: ... @overload def lookup_multiple( self, lookup: "CollectionItemLookupMultiple", work_request: int, expect_type: Literal[CollectionItemType.COLLECTION], default_category: str | None = None, ) -> LookupMultipleResponse[LookupSingleResponseCollection]: ... @overload def lookup_multiple( self, lookup: "CollectionItemLookupMultiple", work_request: int, expect_type: CollectionItemType, default_category: str | None = None, ) -> LookupMultipleResponse[LookupSingleResponse]: ...
[docs] def lookup_multiple( self, lookup: "CollectionItemLookupMultiple", work_request: int, expect_type: CollectionItemType, default_category: str | None = None, ) -> LookupMultipleResponse[LookupSingleResponse]: """Look up multiple collection items.""" request = LookupMultipleRequest( lookup=lookup.dict()["__root__"], work_request=work_request, expect_type=expect_type, default_category=default_category, ) return self._debusine_http_client.post( "/lookup/multiple/", LookupMultipleResponse, data=request.dict() )
def _log_tenacity_exception(self, retry_state: tenacity.RetryCallState): # outcome is always non-None in an "after" hook. assert retry_state.outcome is not None exception = retry_state.outcome.exception() self._logger.error(" Error: %s", exception) def _on_work_request_completed( self, *, command: str, working_directory: Path, last_completed_at: Path | None, msg_content, ): on_work_request_completed = OnWorkRequestCompleted.parse_obj( msg_content ) # Execute the command cmd = [ str(command), str(on_work_request_completed.work_request_id), on_work_request_completed.result, ] self._logger.info("Executing %s", cmd) p = subprocess.Popen(cmd, cwd=working_directory) p.wait() self.write_last_completed_at( last_completed_at, on_work_request_completed.completed_at, ) async def _wait_and_execute( self, *, url: str, command: str, working_directory: Path, last_completed_at: Path | None, ): """ Connect using websockets to URL and wait for msg. Execute command. When a message is received (with "work_request_id" and "success") calls self._execute_command. """ @tenacity.retry( sleep=self._tenacity_sleep, wait=tenacity.wait_random(min=1, max=6), retry=tenacity.retry_if_exception_type( aiohttp.client_exceptions.ClientError ), after=self._log_tenacity_exception, ) async def do_wait_for_messages(session: aiohttp.ClientSession): headers = {} if self.token is not None: headers["Token"] = self.token self._logger.info("Requesting %s", url) async with session.ws_connect( url, headers=headers, heartbeat=60 ) as ws: self._logger.info("Connected!") async for msg in ws: # pragma: no branch await Debusine.process_async_message( msg, { "work_request_completed": functools.partial( self._on_work_request_completed, command=command, working_directory=working_directory, last_completed_at=last_completed_at, ), }, logger=self._logger, ) async with aiohttp.ClientSession() as session: await do_wait_for_messages(session)
[docs] @staticmethod def write_last_completed_at( completed_at_file: Path | None, completed_at: datetime.datetime | None, ) -> None: """Write to completed_at_file the completed_at datetime.""" if completed_at_file is None: return completed_at_formatted = ( completed_at.isoformat() if completed_at else None ) completed_at_file.write_text( json.dumps({"last_completed_at": completed_at_formatted}, indent=2) + "\n" )
@staticmethod def _read_last_completed_at( last_completed_at: Path | None, ) -> str | None: if last_completed_at is None: return None if not last_completed_at.exists(): # The file does not exist, it will be created if a WorkRequest # is completed return None last_completed_at_json = json.loads(last_completed_at.read_bytes()) return last_completed_at_json.get("last_completed_at")
[docs] def on_work_request_completed( self, *, workspaces: list[str] | None = None, last_completed_at: Path | None = None, command: str, working_directory: Path, ): """Execute command when a work request is completed.""" debusine_ws_url = self.base_api_url.replace("http", "ws", 1) url_work_request_completed = ( f"{debusine_ws_url}/ws/1.0/work-request/on-completed/" ) params = {} if workspaces: params["workspaces"] = ",".join(workspaces) completed_at_since = self._read_last_completed_at(last_completed_at) if completed_at_since: params["completed_at_since"] = completed_at_since if params: url_work_request_completed += "?" + urlencode(params) asyncio.run( self._wait_and_execute( url=url_work_request_completed, command=command, working_directory=working_directory, last_completed_at=last_completed_at, ) )
async def _tenacity_sleep(self, delay: float): # Used for the unit tests await asyncio.sleep(delay)
[docs] @staticmethod async def process_async_message( msg: aiohttp.http_websocket.WSMessage, msg_text_to_callable: dict["str", _MessageProcessor], logger: logging.Logger, ) -> bool: """ Process "msg": logs possible error, raise errors. If msg.type is aiohttp.WSMsgType.TEXT: decode msg.data (contains JSON), call callable msg_text_to_callable[msg.data["text"]](msg_content: dict). Return True a callable from msg_to_callable is called or False if not (invalid messages, etc.). :raise TokenDisabledError: if reason_code is TOKEN_DISABLED. """ if not isinstance(msg, aiohttp.http_websocket.WSMessage): logger.debug( 'Worker._process_message: unexpected type: %s ' 'is not an instance of %s', type(msg), aiohttp.http_websocket.WSMessage, ) return False if msg.type == aiohttp.WSMsgType.TEXT: try: msg_content = json.loads(msg.data) logging.debug("Received from the server: '%s'", msg_content) except json.JSONDecodeError as exc: logger.info( # noqa: G200 'Worker._process_message: JSONDecodeError on a message ' 'received from the server: Received: "%s" Error: "%s"', msg.data, exc, ) return False if (text := msg_content.get('text')) is not None: if (functor := msg_text_to_callable.get(text)) is not None: if asyncio.iscoroutinefunction(functor): await functor(msg_content=msg_content) else: functor(msg_content=msg_content) return True else: logging.debug( 'Debusine._process_async_message: invalid text in ' 'msg_content received from the server: %s ', text, ) elif (reason_code := msg_content.get('reason_code')) is not None: if reason_code == 'TOKEN_DISABLED': reason = msg_content.get("reason", "Unknown") raise TokenDisabledError(reason) else: logger.error( 'Unknown reason code (ignoring it): "%s"', reason_code ) else: reason = msg_content.get('reason', 'unknown') logger.info("Disconnected. Reason: '%s'", reason) return False