Workflows (internal API)

Base Workflow infrastructure.

See Workflows for a high level explanation of concepts used here.

exception debusine.server.workflows.base.ArtifactHasNoArchitecture[source]

Bases: Exception

Raised if it’s not possible to determine the artifact’s architecture.

class debusine.server.workflows.base.Workflow(work_request: WorkRequest)[source]

Bases: BaseServerTask[WD, DTD]

Base class for workflow orchestrators.

This is the base API for running WorkflowInstance logic.

TASK_TYPE: TaskTypes = 'Workflow'

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

__init__(work_request: WorkRequest)[source]

Instantiate a Workflow with its database instance.

classmethod build_workflow_data(template_data: dict[str, Any], user_data: dict[str, Any]) dict[str, Any][source]

Merge template data and user-provided data.

callback(work_request: WorkRequest) None[source]

Perform an orchestration step.

Called with the workflow callback work request (note that this is not the same as self.work_request) when the workflow node becomes ready to execute.

Called with a WorkRequest of type internal/workflow to perform an orchestration step triggered by a workflow callback.

This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same WorkRequest structure as calling it once.

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

ensure_dynamic_data(task_database: TaskDatabaseInterface) bool[source]

Ensure that this workflow’s dynamic task data has been computed.

classmethod from_name(name: str) type[debusine.server.workflows.base.Workflow[Any, Any]][source]

Instantiate a workflow by name.

lookup_singleton_collection(category: CollectionCategory, *, workspace: debusine.db.models.workspaces.Workspace | None = None) Collection[source]

Look up a singleton collection related to this workflow.

name: ClassVar[str] = 'workflow'
abstract populate() None[source]

Create the initial WorkRequest structure.

This is called once, when the workflow first becomes runnable. validate_input() will already have been called.

This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same WorkRequest structure as calling it once.

static provides_artifact(work_request: WorkRequest, category: ArtifactCategory, name: str, *, data: dict[str, Any] | None = None, artifact_filters: dict[str, Any] | None = None) None[source]

Indicate work_request will provide an artifact.

Parameters:
  • work_request – work request that will provide the artifact

  • category – category of the artifact that will be provided

  • name – name of this item in the workflow’s internal collection

  • data – add it to the data dictionary for the event reaction

  • artifact_filters – for the update-collection-with-artifacts action, to allow workflows to add filtering

Raises:

LookupError – if a key in “data” starts with promise_

Create an event reaction for on_creation adding a promise: this work request will create an artifact.

Create an event reaction for on_success to update the collection with the relevant artifact.

static requires_artifact(work_request: WorkRequest, lookup: int | str | debusine.tasks.models.LookupMultiple) None[source]

Indicate that work_request requires input (lookup).

Parameters:
  • work_request – for each lookup result call work_request.add_dependency(promise["promise_work_request_id"])

  • lookup – resolve the lookup and iterate over the results (for PROMISES only)

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘WD’, bound=BaseWorkflowData)

validate_input() None[source]

Thorough validation of input data.

This is run only once at workflow instantiation time, and can do slower things like database lookups to validate artifact or collection types.

classmethod validate_template_data(data: dict[str, Any]) None[source]

Validate WorkflowTemplate data.

work_request_ensure_child(task_name: str, task_data: BaseTaskData, workflow_data: WorkRequestWorkflowData, task_type: TaskTypes = TaskTypes.WORKER) WorkRequest[source]

Create the child if one does not already exist.

Returns:

new or existing WorkRequest.

exception debusine.server.workflows.base.WorkflowRunError(work_request: WorkRequest, message: str)[source]

Bases: Exception

Running a workflow orchestrator or callback failed.

__init__(work_request: WorkRequest, message: str) None[source]

Construct the exception.

exception debusine.server.workflows.base.WorkflowValidationError[source]

Bases: Exception

Raised if a workflow fails to validate its inputs.

debusine.server.workflows.base.orchestrate_workflow(work_request: WorkRequest) None[source]

Orchestrate a workflow in whatever way is appropriate.

For a workflow callback, run callback and mark the work request as completed. For a workflow, run populate and unblock workflow children, but leave the workflow running until all its children have finished. For any other work request, raise an error.

Utility functions for workflows.

debusine.server.workflows.workflow_utils.filter_artifact_lookup_by_arch(workflow: Workflow[Any, Any], lookup: LookupMultiple, architectures: Iterable[str]) LookupMultiple[source]

Filter an artifact lookup by architecture.

debusine.server.workflows.workflow_utils.follow_artifact_relation(artifact: Artifact, relation_type: Relations, category: ArtifactCategory) Artifact[source]

Follow relations from artifact to find an artifact of category.

debusine.server.workflows.workflow_utils.get_architectures(workflow: Workflow[Any, Any], lookup: LookupMultiple) set[str][source]

Return set with all the architectures in the artifacts from the lookup.

The architectures are extracted from each lookup result using lookup_result_architecture().

debusine.server.workflows.workflow_utils.get_available_architectures(workflow: Workflow[Any, Any], *, vendor: str, codename: str) set[str][source]

Get architectures available for use with this vendor/codename.

debusine.server.workflows.workflow_utils.get_source_package_names(results: Sequence[LookupResult], *, configuration_key: str, artifact_expected_categories: Collection[ArtifactCategory]) list[str][source]

Return a sorted list of source package names from results.

It ensures that:

  • The LookupResult objects contain either an artifact or promise.

  • Artifacts belong to the artifact_expected_categories.

  • If LookupResult is a promise: extracts the name from the promise data source_package_name.

Parameters:
  • results – A sequence of LookupResult objects representing artifacts to be processed. Each entry is expected to be either an artifact or a promise.

  • configuration_key – A string used by BaseTask.ensure_artifact_categories() for the exception message.

  • artifact_expected_categories – valid ArtifactCategory that artifacts must belong to.

Returns:

A sorted list of source package names.

debusine.server.workflows.workflow_utils.locate_debian_source_package(configuration_key: str, artifact: Artifact) Artifact[source]

Accept a debian:upload or debian:source-package in a workflow.

Resolve to the debian:source-package.

debusine.server.workflows.workflow_utils.locate_debian_source_package_lookup(workflow: Workflow[Any, Any], configuration_key: str, lookup: int | str) int | str[source]

Return a lookup to a debian:source-package.

If the specified lookup returns a debian:source-package, return it. If it returns a debian:upload, find the related debian:source-package and return a lookup to it.

debusine.server.workflows.workflow_utils.lookup_result_architecture(result: LookupResult) str[source]

Get architecture from result of looking up an artifact.

debusine.server.workflows.workflow_utils.source_package(workflow: Workflow[Any, Any]) Artifact[source]

Retrieve the source package artifact.

If workflow.data.input exists, use workflow.data.input.source_artifact, otherwise workflow.data.source_artifact.

If the source artifact is a debian:upload, returns its debian:source-package.

debusine.server.workflows.workflow_utils.source_package_data(workflow: Workflow[Any, Any]) DebianSourcePackage[source]

Return source package artifact data for the workflow.