Workflows (internal API)
Base Workflow infrastructure.
See Workflows for a high level explanation of concepts used here.
- exception debusine.server.workflows.base.ArtifactHasNoArchitecture[source]
Bases:
Exception
Raised if it’s not possible to determine the artifact’s architecture.
- class debusine.server.workflows.base.Workflow(work_request: WorkRequest)[source]
Bases:
BaseServerTask
[WD
,DTD
]Base class for workflow orchestrators.
This is the base API for running
WorkflowInstance
logic.- TASK_TYPE: TaskTypes = 'Workflow'
The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.
- __init__(work_request: WorkRequest)[source]
Instantiate a Workflow with its database instance.
- classmethod build_workflow_data(template_data: dict[str, Any], user_data: dict[str, Any]) dict[str, Any] [source]
Merge template data and user-provided data.
- callback(work_request: WorkRequest) None [source]
Perform an orchestration step.
Called with the workflow callback work request (note that this is not the same as
self.work_request
) when the workflow node becomes ready to execute.Called with a
WorkRequest
of type internal/workflow to perform an orchestration step triggered by a workflow callback.This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same
WorkRequest
structure as calling it once.
- dynamic_task_data_type
Class used as the in-memory representation of dynamic task data.
alias of TypeVar(‘DTD’, bound=
BaseDynamicTaskData
)
- ensure_dynamic_data(task_database: TaskDatabaseInterface) bool [source]
Ensure that this workflow’s dynamic task data has been computed.
- classmethod from_name(name: str) type[debusine.server.workflows.base.Workflow[Any, Any]] [source]
Instantiate a workflow by name.
- lookup_singleton_collection(category: CollectionCategory, *, workspace: debusine.db.models.workspaces.Workspace | None = None) Collection [source]
Look up a singleton collection related to this workflow.
- abstract populate() None [source]
Create the initial WorkRequest structure.
This is called once, when the workflow first becomes runnable.
validate_input()
will already have been called.This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same
WorkRequest
structure as calling it once.
- static provides_artifact(work_request: WorkRequest, category: ArtifactCategory, name: str, *, data: dict[str, Any] | None = None, artifact_filters: dict[str, Any] | None = None) None [source]
Indicate work_request will provide an artifact.
- Parameters:
work_request – work request that will provide the artifact
category – category of the artifact that will be provided
name – name of this item in the workflow’s internal collection
data – add it to the data dictionary for the event reaction
artifact_filters – for the update-collection-with-artifacts action, to allow workflows to add filtering
- Raises:
LookupError – if a key in “data” starts with
promise_
Create an event reaction for
on_creation
adding a promise: this work request will create an artifact.Create an event reaction for
on_success
to update the collection with the relevant artifact.
- static requires_artifact(work_request: WorkRequest, lookup: int | str | debusine.tasks.models.LookupMultiple) None [source]
Indicate that work_request requires input (lookup).
- Parameters:
work_request – for each lookup result call
work_request.add_dependency(promise["promise_work_request_id"])
lookup – resolve the lookup and iterate over the results (for PROMISES only)
- task_data_type
Class used as the in-memory representation of task data.
alias of TypeVar(‘WD’, bound=
BaseWorkflowData
)
- validate_input() None [source]
Thorough validation of input data.
This is run only once at workflow instantiation time, and can do slower things like database lookups to validate artifact or collection types.
- classmethod validate_template_data(data: dict[str, Any]) None [source]
Validate WorkflowTemplate data.
- work_request_ensure_child(task_name: str, task_data: BaseTaskData, workflow_data: WorkRequestWorkflowData, task_type: TaskTypes = TaskTypes.WORKER) WorkRequest [source]
Create the child if one does not already exist.
- Returns:
new or existing
WorkRequest
.
- exception debusine.server.workflows.base.WorkflowRunError(work_request: WorkRequest, message: str)[source]
Bases:
Exception
Running a workflow orchestrator or callback failed.
- __init__(work_request: WorkRequest, message: str) None [source]
Construct the exception.
- exception debusine.server.workflows.base.WorkflowValidationError[source]
Bases:
Exception
Raised if a workflow fails to validate its inputs.
- debusine.server.workflows.base.orchestrate_workflow(work_request: WorkRequest) None [source]
Orchestrate a workflow in whatever way is appropriate.
For a workflow callback, run
callback
and mark the work request as completed. For a workflow, runpopulate
and unblock workflow children, but leave the workflow running until all its children have finished. For any other work request, raise an error.
Utility functions for workflows.
- debusine.server.workflows.workflow_utils.filter_artifact_lookup_by_arch(workflow: Workflow[Any, Any], lookup: LookupMultiple, architectures: Iterable[str]) LookupMultiple [source]
Filter an artifact lookup by architecture.
- debusine.server.workflows.workflow_utils.follow_artifact_relation(artifact: Artifact, relation_type: Relations, category: ArtifactCategory) Artifact [source]
Follow relations from artifact to find an artifact of category.
- debusine.server.workflows.workflow_utils.get_architectures(workflow: Workflow[Any, Any], lookup: LookupMultiple) set[str] [source]
Return set with all the architectures in the artifacts from the lookup.
The architectures are extracted from each lookup result using
lookup_result_architecture()
.
- debusine.server.workflows.workflow_utils.get_available_architectures(workflow: Workflow[Any, Any], *, vendor: str, codename: str) set[str] [source]
Get architectures available for use with this vendor/codename.
- debusine.server.workflows.workflow_utils.get_source_package_names(results: Sequence[LookupResult], *, configuration_key: str, artifact_expected_categories: Collection[ArtifactCategory]) list[str] [source]
Return a sorted list of source package names from results.
It ensures that:
The
LookupResult
objects contain either an artifact or promise.Artifacts belong to the artifact_expected_categories.
If
LookupResult
is a promise: extracts the name from the promise datasource_package_name
.
- Parameters:
results – A sequence of
LookupResult
objects representing artifacts to be processed. Each entry is expected to be either an artifact or a promise.configuration_key – A string used by
BaseTask.ensure_artifact_categories()
for the exception message.artifact_expected_categories – valid
ArtifactCategory
that artifacts must belong to.
- Returns:
A sorted list of source package names.
- debusine.server.workflows.workflow_utils.locate_debian_source_package(configuration_key: str, artifact: Artifact) Artifact [source]
Accept a debian:upload or debian:source-package in a workflow.
Resolve to the debian:source-package.
- debusine.server.workflows.workflow_utils.locate_debian_source_package_lookup(workflow: Workflow[Any, Any], configuration_key: str, lookup: int | str) int | str [source]
Return a lookup to a debian:source-package.
If the specified lookup returns a debian:source-package, return it. If it returns a debian:upload, find the related debian:source-package and return a lookup to it.
- debusine.server.workflows.workflow_utils.lookup_result_architecture(result: LookupResult) str [source]
Get architecture from result of looking up an artifact.
- debusine.server.workflows.workflow_utils.source_package(workflow: Workflow[Any, Any]) Artifact [source]
Retrieve the source package artifact.
If
workflow.data.input
exists, useworkflow.data.input.source_artifact
, otherwiseworkflow.data.source_artifact
.If the source artifact is a debian:upload, returns its debian:source-package.