Workflows (internal API)

Base Workflow infrastructure.

See Workflows for a high level explanation of concepts used here.

class debusine.server.workflows.base.Workflow(work_request: WorkRequest)[source]

Bases: BaseServerTask[WD, DTD]

Base class for workflow orchestrators.

This is the base API for running WorkflowInstance logic.

TASK_TYPE: TaskTypes = 'Workflow'

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

__init__(work_request: WorkRequest)[source]

Instantiate a Workflow with its database instance.

classmethod build_workflow_data(template_data: dict[str, Any], user_data: dict[str, Any]) dict[str, Any][source]

Merge template data and user-provided data.

callback(work_request: WorkRequest) None[source]

Perform an orchestration step.

Called with the workflow callback work request (note that this is not the same as self.work_request) when the workflow node becomes ready to execute.

Called with a WorkRequest of type internal/workflow to perform an orchestration step triggered by a workflow callback.

This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same WorkRequest structure as calling it once.

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

ensure_dynamic_data(task_database: TaskDatabaseInterface) None[source]

Ensure that this workflow’s dynamic task data has been computed.

classmethod from_name(name: str) type[debusine.server.workflows.base.Workflow[Any, Any]][source]

Instantiate a workflow by name.

lookup_singleton_collection(category: CollectionCategory, *, workspace: debusine.db.models.workspaces.Workspace | None = None) Collection[source]

Look up a singleton collection related to this workflow.

name: ClassVar[str] = 'workflow'
abstract populate() None[source]

Create the initial WorkRequest structure.

This is called once, when the workflow first becomes runnable. validate_input() will already have been called.

This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same WorkRequest structure as calling it once.

static provides_artifact(work_request: WorkRequest, category: ArtifactCategory, name: str, *, data: dict[str, Any] | None = None, artifact_filters: dict[str, Any] | None = None) None[source]

Indicate work_request will provide an artifact.

Parameters:
  • work_request – work request that will provide the artifact

  • category – category of the artifact that will be provided

  • name – name of this item in the workflow’s internal collection

  • data – add it to the data dictionary for the event reaction

  • artifact_filters – for the update-collection-with-artifacts action, to allow workflows to add filtering

Raises:

LookupError – if a key in “data” starts with promise_

Create an event reaction for on_creation adding a promise: this work request will create an artifact.

Create an event reaction for on_success to update the collection with the relevant artifact.

static requires_artifact(work_request: WorkRequest, lookup: int | str | debusine.tasks.models.LookupMultiple) None[source]

Indicate that work_request requires input (lookup).

Parameters:
  • work_request – for each lookup result call work_request.add_dependency(promise["promise_work_request_id"])

  • lookup – resolve the lookup and iterate over the results (for PROMISES only)

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘WD’, bound=BaseWorkflowData)

validate_input() None[source]

Thorough validation of input data.

This is run only once at workflow instantiation time, and can do slower things like database lookups to validate artifact or collection types.

classmethod validate_template_data(data: dict[str, Any]) None[source]

Validate WorkflowTemplate data.

work_request_ensure_child(task_name: str, task_data: BaseTaskData, workflow_data: WorkRequestWorkflowData, task_type: TaskTypes = TaskTypes.WORKER) WorkRequest[source]

Create the child if one does not already exist.

Returns:

new or existing WorkRequest.

exception debusine.server.workflows.base.WorkflowRunError(work_request: WorkRequest, message: str, code: str)[source]

Bases: Exception

Running a workflow orchestrator or callback failed.

__init__(work_request: WorkRequest, message: str, code: str) None[source]

Construct the exception.

exception debusine.server.workflows.base.WorkflowValidationError[source]

Bases: Exception

Raised if a workflow fails to validate its inputs.

debusine.server.workflows.base.orchestrate_workflow(work_request: WorkRequest) None[source]

Orchestrate a workflow in whatever way is appropriate.

For a workflow callback, run callback and mark the work request as completed. For a workflow, run populate and unblock workflow children, but leave the workflow running until all its children have finished. For any other work request, raise an error.

Utility functions for workflows.

exception debusine.server.workflows.workflow_utils.ArtifactHasNoArchitecture[source]

Bases: Exception

Raised if it’s not possible to determine the artifact’s architecture.

exception debusine.server.workflows.workflow_utils.ArtifactHasNoBinaryPackageName[source]

Bases: Exception

Raised if it’s not possible to determine the artifact’s binary name.

debusine.server.workflows.workflow_utils.configure_for_overlay_suite(workflow: Workflow[Any, Any], *, extra_repositories: list[debusine.tasks.models.ExtraRepository] | None, vendor: str, codename: str, environment: int | str, backend: BackendType, architecture: str) list[debusine.tasks.models.ExtraRepository] | None[source]

Return any needed extra repository to use an overlay suite.

debusine.server.workflows.workflow_utils.filter_artifact_lookup_by_arch(workflow: Workflow[Any, Any], lookup: LookupMultiple, architectures: Iterable[str]) LookupMultiple[source]

Filter an artifact lookup by architecture.

debusine.server.workflows.workflow_utils.follow_artifact_relation(artifact: Artifact, relation_type: Relations, category: ArtifactCategory) Artifact[source]

Follow relations from artifact to find an artifact of category.

debusine.server.workflows.workflow_utils.get_architectures(workflow: Workflow[Any, Any], lookup: LookupMultiple) set[str][source]

Return set with all the architectures in the artifacts from the lookup.

The architectures are extracted from each lookup result using lookup_result_architecture().

debusine.server.workflows.workflow_utils.get_available_architectures(workflow: Workflow[Any, Any], *, vendor: str, codename: str) set[str][source]

Get architectures available for use with this vendor/codename.

debusine.server.workflows.workflow_utils.get_source_package_names(results: Sequence[LookupResult], *, configuration_key: str, artifact_expected_categories: Collection[ArtifactCategory]) list[str][source]

Return a sorted list of source package names from results.

It ensures that:

  • The LookupResult objects contain either an artifact or promise.

  • Artifacts belong to the artifact_expected_categories.

  • If LookupResult is a promise: extracts the name from the promise data source_package_name.

Parameters:
  • results – A sequence of LookupResult objects representing artifacts to be processed. Each entry is expected to be either an artifact or a promise.

  • configuration_key – A string used by BaseTask.ensure_artifact_categories() for the exception message.

  • artifact_expected_categories – valid ArtifactCategory that artifacts must belong to.

Returns:

A sorted list of source package names.

debusine.server.workflows.workflow_utils.locate_debian_source_package(configuration_key: str, artifact: Artifact) Artifact[source]

Accept a debian:upload or debian:source-package in a workflow.

Resolve to the debian:source-package.

debusine.server.workflows.workflow_utils.locate_debian_source_package_lookup(workflow: Workflow[Any, Any], configuration_key: str, lookup: int | str) int | str[source]

Return a lookup to a debian:source-package.

If the specified lookup returns a debian:source-package, return it. If it returns a debian:upload, find the related debian:source-package and return a lookup to it.

debusine.server.workflows.workflow_utils.lookup_result_architecture(result: LookupResult) str[source]

Get architecture from result of looking up an artifact.

debusine.server.workflows.workflow_utils.lookup_result_artifact_category(result: LookupResult) str[source]

Get artifact category from result of looking up an artifact.

The result may be either an artifact or a promise.

debusine.server.workflows.workflow_utils.lookup_result_binary_package_name(result: LookupResult) str[source]

Get binary package name from result of looking up an artifact.

debusine.server.workflows.workflow_utils.source_package(workflow: Workflow[Any, Any]) Artifact[source]

Retrieve the source package artifact.

If workflow.data.input exists, use workflow.data.input.source_artifact, otherwise workflow.data.source_artifact.

If the source artifact is a debian:upload, returns its debian:source-package.

debusine.server.workflows.workflow_utils.source_package_data(workflow: Workflow[Any, Any]) DebianSourcePackage[source]

Return source package artifact data for the workflow.