Task input fields (internal API)

class debusine.tasks.inputs.SingleInput(*, categories: Collection[ArtifactCategory] | None = None, **kwargs: Unpack[DataFieldInputKwargs])[source]

Bases: ArtifactInput[int | str, ArtifactInfo]

One single input artifact specified in a task data field.

lookup_type: type[LookupType] = int | str
resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) ArtifactInfo[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.PydanticSerializer object>
class debusine.tasks.inputs.OptionalSingleInput(*, categories: Collection[ArtifactCategory] | None = None, **kwargs: Unpack[DataFieldInputKwargs])[source]

Bases: ArtifactInput[int | str, ArtifactInfo | None]

One optional single input artifact specified in a task data field.

lookup_type: type[LookupType] = int | str
resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) ArtifactInfo | None[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.OptionalPydanticSerializer object>
class debusine.tasks.inputs.SingleInputList(*, categories: Collection[ArtifactCategory] | None = None, **kwargs: Unpack[DataFieldInputKwargs])[source]

Bases: ArtifactListInput[int | str, list[ArtifactInfo]]

A list of single input artifact lookups in a task data field.

item_type: type[LookupType] = int | str
lookup_type

alias of Collection

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) list[ArtifactInfo][source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.PydanticListSerializer object>
class debusine.tasks.inputs.MultiInput(*, categories: Collection[ArtifactCategory] | None = None, **kwargs: Unpack[DataFieldInputKwargs])[source]

Bases: ArtifactInput[LookupMultiple, InputArtifactMultiple]

A list of multiple artifacts lookups specified in a task data field.

lookup_type

alias of LookupMultiple

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) InputArtifactMultiple[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.PydanticSerializer object>
class debusine.tasks.inputs.MultiInputList(*, categories: Collection[ArtifactCategory] | None = None, **kwargs: Unpack[DataFieldInputKwargs])[source]

Bases: ArtifactListInput[LookupMultiple, list[InputArtifactMultiple]]

A multiple artifacts lookup specified in a task data field.

item_type

alias of LookupMultiple

lookup_type

alias of Collection

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) list[InputArtifactMultiple][source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.MultiInputListSerializer object>
class debusine.tasks.inputs.UploadArtifactsInput(**kwargs: Unpack[ArtifactInputKwargs])[source]

Bases: ArtifactInput[LookupMultiple, InputArtifactMultiple]

A multiple artifact lookup finding all related binaries in an upload.

__init__(**kwargs: Unpack[ArtifactInputKwargs]) None[source]

Look up artifacts enforcing their type.

Parameters:
  • field – dot-separated list of lookup names in task data

  • categories – list of acceptable artifact categories

lookup_type

alias of LookupMultiple

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) InputArtifactMultiple[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.PydanticSerializer object>
class debusine.tasks.inputs.EnvironmentInput(*, image_category: ExecutorImageCategory | None = None, set_backend: bool = True, try_variant: bool = True, **kwargs: Unpack[ArtifactInputKwargs])[source]

Bases: ArtifactInput[int | str, ArtifactInfo]

One environment artifact specified in a task data field.

__init__(*, image_category: ExecutorImageCategory | None = None, set_backend: bool = True, try_variant: bool = True, **kwargs: Unpack[ArtifactInputKwargs]) None[source]

Look up artifacts enforcing their type.

Parameters:
  • field – dot-separated list of lookup names in task data

  • categories – list of acceptable artifact categories

  • image_category – try to use an environment with this image category; defaults to the image category needed by the executor for self.backend

  • set_backend – if True (default), try to use an environment matching self.backend

  • try_variant – if True (default), try to use an environment whose variant is self.name, but fall back to looking up an environment without a variant if the first lookup fails

lookup_type: type[LookupType] = int | str
resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) ArtifactInfo[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.PydanticSerializer object>
class debusine.tasks.inputs.OptionalEnvironmentInput(*, categories: Collection[ArtifactCategory] | None = None, **kwargs: Unpack[DataFieldInputKwargs])[source]

Bases: ArtifactInput[int | str, ArtifactInfo | None]

One optional environment artifact specified in a task data field.

lookup_type: type[LookupType] = int | str
resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) ArtifactInfo | None[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.OptionalPydanticSerializer object>
class debusine.tasks.inputs.SuiteArchiveInput(field: str | None = None, **kwargs: Unpack[TaskInputKwargs])[source]

Bases: DataFieldInput[int | str, tuple[InputCollectionSingle, InputCollectionSingle | None]]

Look up a suite and an archive based on a task data field.

lookup_type: type[LookupType] = int | str
resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) tuple[InputCollectionSingle, InputCollectionSingle | None][source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.SuiteArchiveInputSerializer object>
class debusine.tasks.inputs.ExtraRepositoriesInput(field: str | None = None, **kwargs: Unpack[TaskInputKwargs])[source]

Bases: DataFieldInput[list[ExtraDebusineRepository | ExtraExternalRepository], list[ExtraExternalRepository] | None]

Look up extra repositories.

lookup_type

alias of list

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) list[ExtraExternalRepository] | None[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.ExtraRepositoriesSerializer object>
class debusine.tasks.inputs.DebusineFQDNInput(stage: Stage = Stage.ALL)[source]

Bases: TaskInput[str]

Resolve to the Debusine FQDN.

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) str[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.ScalarSerializer object>
class debusine.tasks.inputs.BuildArchitectureInput(field: str | None = None, **kwargs: Unpack[TaskInputKwargs])[source]

Bases: DataFieldInput[str, str]

Build architecture for the task.

This currently looks up a value in the task data, and uses a fallback value if it is not set.

lookup_type

alias of str

resolve(task: BaseExternalTask[Any, Any], task_database: TaskDatabaseInterface) str[source]

Resolve the input from task data.

serializer: ClassVar[TaskInputSerializer[ValueType]] = <debusine.tasks.inputs.ScalarSerializer object>