Control Flow
Create pipelines with control flow
Although a KFP pipeline decorated with the @dsl.pipeline
decorator looks like a normal Python function, it is actually an expression of pipeline topology and control flow semantics, constructed using the KFP domain-specific language (DSL). Pipeline Basics covered how data passing expresses pipeline topology through task dependencies. This section describes how to use control flow in your pipelines using the KFP DSL. The DSL features three types of control flow, each implemented by a Python context manager:
Conditions
Looping
Exit handling
Conditions (dsl.Condition)
The dsl.Condition
context manager enables conditional execution of tasks within its scope based on the output of an upstream task or pipeline input parameter. The context manager takes two arguments: a required condition
and an optional name
. The condition
is a comparative expression where at least one of the two operands is an output from an upstream task or a pipeline input parameter.
In the following pipeline, conditional_task
only executes if coin_flip_task
has the output 'heads'
.
Parallel looping (dsl.ParallelFor)
The dsl.ParallelFor
context manager allows parallel execution of tasks over a static set of items. The context manager takes three arguments: a required items
, an optional parallelism
, and an optional name
. items
is the static set of items to loop over and parallelism
is the maximum number of concurrent iterations permitted while executing the dsl.ParallelFor
group. parallelism=0
indicates unconstrained parallelism.
Not yet supported
Setting parallelism
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the GitHub issue.
In the following pipeline, train_model
will train a model for 1, 5, 10, and 25 epochs, with no more than two training tasks running at one time:
Not yet supported
dsl.Collected
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the GitHub issue.
Use dsl.Collected
with dsl.ParallelFor
to gather outputs from a parallel loop of tasks:
Downstream tasks might consume dsl.Collected
outputs via an input annotated with a List
of parameters or a List
of artifacts. For example, select_best
in the preceding example has the input models
with type Input[List[Model]]
, as shown by the following component definition:
You can use dsl.Collected
to collect outputs from nested loops in a nested list of parameters. For example, output parameters from two nested dsl.ParallelFor
groups are collected in a multilevel nested list of parameters, where each nested list contains the output parameters from one of the dsl.ParallelFor
groups. The number of nested levels is based on the number of nested dsl.ParallelFor
contexts.
By comparison, artifacts created in nested loops are collected in a flat list.
You can also return a dsl.Collected
from a pipeline. Use a List
of parameters or a List
of artifacts in the return annotation, as shown in the following example:
Exit handling (dsl.ExitHandler)
The dsl.ExitHandler
context manager allows pipeline authors to specify an exit task which will run after the tasks within the context manager’s scope finish execution, even if one of those tasks fails. This is analogous to using a try:
block followed by a finally:
block in normal Python, where the exit task is in the finally:
block. The context manager takes two arguments: a required exit_task
and an optional name
. exit_task
accepts an instantiated PipelineTask
.
In the following pipeline, clean_up_task
will execute after both create_dataset
and train_and_save_models
finish or either of them fail:
The task you use as an exit task may use a special input that provides access to pipeline and task status metadata, including pipeline failure or success status. You can use this special input by annotating your exit task with the dsl.PipelineTaskFinalStatus
annotation. The argument for this parameter will be provided by the backend automatically at runtime. You should not provide any input to this annotation when you instantiate your exit task.
The following pipeline uses dsl.PipelineTaskFinalStatus
to obtain information about the pipeline and task failure, even after fail_op
fails:
Ignore upstream failure
The .ignore_upstream_failure()
task method on PipelineTask
enables another approach to author pipelines with exit handling behavior. Calling this method on a task causes the task to ignore failures of any specified upstream tasks (as established by data exchange or by use of .after()
). If the task has no upstream tasks, this method has no effect.
In the following pipeline definition, clean_up_task
is executed after fail_op
, regardless of whether fail_op
succeeds:
Note that the component used for the caller task (print_op
in the example above) requires a default value for all inputs it consumes from an upstream task. The default value is applied if the upstream task fails to produce the outputs that are passed to the caller task. Specifying default values ensures that the caller task always succeeds, regardless of the status of the upstream task.
Last updated