Implementation of workflows

Underlying concepts

The basic concepts have been introduced in Workflows but there are other concepts involved if you start digging into how workflows are implemented.

Workflow implementation

On the Python side, a workflow is orchestrated by a subclass of Workflow, which derives from BaseTask and has its own subclass hierarchy.

When instantiating a “Workflow”, a new WorkRequest is created with:

  • task_type set to "workflow"

  • task_name pointing to the Workflow subclass used to orchestrate

  • task_data set to the workflow parameters instantiated from the template (or from the parent workflow)

This WorkRequest acts as the root of the WorkRequest hierarchy for the running workflow.

The Workflow class runs on the server with full database access and is in charge of:

  • on instantiation, laying out an execution plan under the form of a directed acyclic graph of newly created WorkRequest instances.

  • analyzing the results of any completed WorkRequest in the graph

  • possibly extending/modifying the graph after this analysis

WorkRequest elements in a Workflow can only depend among each other, and cannot have dependencies on WorkRequest elements outside the workflow.

All the child work requests start in the blocked status using the deps unblock strategy. When the Workflow WorkRequest is ready to run, all the child WorkRequest elements that don’t have any further dependencies can immediately start.

Special work requests

Server-side task

While worker-side tasks can be scheduled by any user, only Workflow objects can schedule execution of server-side tasks.

Server-side tasks have database access: they can thus analyze their parent workflow, including all the completed work requests and the generated artifacts, they can also consume and generate runtime data that will be available for other steps in the workflow (through the internal collection associated with the workflow’s root WorkRequest).

Workflow callback

This is a special kind of work request: when it becomes executable, it runs (either inline in the scheduler or in a celery task) the Workflow orchestrator which is then allowed to alter the graph of work requests. It can add additional work requests in the workflow instance, perhaps after analyzing the results of previously-completed work requests.

This special work request is identified with its task_type being internal and with task_name being workflow. Its associated task_data is an empty dictionary but the workflow_data dictionary must have a step key to identify the callback.

The Workflow object to use is identified by looking up its name on the associated WorkRequest.

Synchronization point

This is a work request that does nothing. It main use is to provide synchronization points in a graph of blocked work requests. In particular they are systematically used to represent the entry or exit points of sub-workflows or of groups of related work requests.

When such a work request becomes pending, it is immediately marked as completed, thus unblocking work requests that depend on it.

This work request typically has metadata explaining its purpose and influencing the rendering of the workflow’s visual representation.

This kind of work request is identified with its task_type being internal and with task_name being synchronization_point. Its associated task_data is an empty dictionary.

Advanced workflows

Advanced workflows can be created by combining multiple limited-purpose workflows.

Sub-workflows are integrated in the general graph of their parent workflow as WorkRequests of type workflow.

From a user interface perspective, sub-workflows are typically hidden as a single step in the visual representation of the parent’s workflow.

Group of work requests

When a workflow generates a large number of related/similar work requests, it might want to hide all those work requests behind a group that would appear a single step in the visual representation of the workflow.

Database models

WorkflowTemplate

The WorkflowTemplate model has (at least) the following fields:

  • name: a unique name given to the workflow within the workspace

  • workspace: a foreign key to the workspace containing the workflow

  • task_name: a name that refers back to the Workflow class to use to manage the execution of the workflow

  • task_data: JSON dict field representing a subset of the parameters needed by the workflow that cannot be overridden when instantiating the root WorkRequest

WorkRequest

The root WorkRequest of the workflow copies the following fields from WorkflowTemplate:

  • workspace

  • task_name

  • task_data, combining the user-supplied data and the WorkflowTemplate-imposed data)

This blueprint extends WorkRequest with the following additional fields:

  • parent: foreign key to the containing WorkRequest (or NULL when scheduled outside of a workflow)

  • workflow_data: JSON dict controlling some workflow specific behaviour

  • internal_collection: foreign key to a workflow-internal collection

  • expiration_delay: minimal time where workflow is kept after completion

The new workflow_data field is expected to support the following keys:

  • allow_failure (optional, defaults to False): boolean indicating what to do with the parent workflow if the work request fails. If true, the workflow can continue, otherwise the workflow is interrupted.

  • display_name: name of the step in the visual representation of the workflow

  • step: internal identifier used to differentiate multiple workflow callbacks inside a single workflow. It acts like a machine equivalent for “display_name”, to allow the orchestrator to encode the plan about what it is supposed to do at this point in the workflow.

The new event_reactions field is a dictionary mapping events to a list of actions. Each action is described with a dictionary where the action key defines the action to perform and where the remaining keys are used to define the specifics of the action to be performed. See section below for details. The supported events are the following:

  • on_success: event triggered when the work request completes successfully

  • on_failure: event triggered when the work request fails or errors out

Supported actions

send-notification

Sends a notification of the event using an existing notification channel.

  • channel: name of the notification channel to use

  • data: parameters for the notification method

update-collection-with-artifacts

Adds or replaces artifact-based collection items with artifacts generated by the current work request.

  • collection (required): collection to update, as a single lookup

  • name_template (string, optional): template used to generate the name for the collection item associated to a given artifact. Uses the str.format templating syntax (with variables inside curly braces).

  • variables (dict, optional): definition of variables to prepare to be able to compute the name for the collection item. Each key is the name of the variable and each value is a JSON path query to execute against the data dictionary of the target artifact in order to compute the value of the variable.

  • artifact_filters (dict, required): this parameter makes it possible to identify a subset of generated artifacts to add to the collection. Each key-value represents a specific Django’s ORM filter query against the Artifact model so that one can run work_request.artifact_set.filter(**artifact_filters) to identify the desired set of artifacts.

Note

When the name_template key is not provided, it is expected that the collection will compute the name for the new artifact-based collection item. Some collection categories might not even allow you to override the name.

As an example, you could register all the binary packages having Section: python and a dependency on libpython3.12 out of a sbuild task with names like $PACKAGE_$VERSION by using this action:

action: 'update-collection-with-artifacts'
artifact_filters:
  category: 'debian:binary-package'
  data__deb_fields__Section: 'python'
  data__deb_fields__Depends__contains: 'libpython3.12'
collection: 'internal@collections'
name_template: '{package}_{version}'
variables:
  package: 'deb_fields.Package'
  version: 'deb_fields.Version'

Expected changes

  • The work-request-completed view, or the scheduler, should perform extra work:

    • execute the event_reactions actions

    • lookup work requests that depend on the completed work request and unblock them if no other work request is blocking them

  • The scheduler needs to learn to deal with synchronization points. They don’t need any worker to be executed. They should likely be dealt in a quick background task that can be scheduled as soon as we unblock such a work request.

  • The scheduler needs to learn to deal with workflow callbacks. It’s not clear whether they should be handled like a full server-side work request: they are heavier than synchronization points, but lighter than server-side work requests. They are also internal machinery that doesn’t need to be exposed in the visual representation of workflows.

    They could be batched together with synchronization points in a celery queue grouping all scheduling-related tasks.

  • Work requests for tasks with task_type set to "internal" should be hidden from the web interface (this will exclude synchronization points and workflow callbacks, avoiding clutter in those views).

  • The current logic for dealing with notifications should be replaced with the new event_reactions mechanism. The API to POST a new WorkRequest should accept that new field, ensuring that it only contains send-notification actions (other actions being restricted, they can only be added through workflows). The debusine create-work-request CLI should be updated to have a new command line parameter --events-reactions PATH pointing to a YAML file with the desired content for that field.

  • During implementation it may become apparent that the foreign key to a WorkRequest’s parent should be replaced with a foreign key to the closest parent node of type "workflow"

Validation of the design for various expected steps

Todo

Consider some of the steps that we expect to implement and double check that they can be implemented in such a framework.