Pipelines
Compose components into pipelines
Last updated
Compose components into pipelines
Last updated
While components have three authoring approaches, pipelines have one authoring approach: they are defined with a pipeline function decorated with the decorator. Take the following pipeline, pythagorean
, which implements the Pythagorean theorem as a pipeline via simple arithmetic components:
A pipeline definition has four parts:
The pipeline decorator
Inputs and outputs declared in the function signature
Data passing and task dependencies
Task configurations
Pipeline control flow
KFP pipelines are defined inside functions decorated with the @dsl.pipeline
decorator. The decorator takes three optional arguments:
name
is the name of your pipeline. If not provided, the name defaults to a sanitized version of the pipeline function name.
description
is a description of the pipeline.
pipeline_root
is the root path of the remote storage destination within which the tasks in your pipeline will create outputs. pipeline_root
may also be set or overridden by pipeline submission clients.
display_name
is a human-readable for your pipeline.
You can modify the definition of pythagorean
to use these arguments:
In the preceding example, pythagorean
accepts inputs a
and b
, each typed float
, and creates one float
output.
For a task with a single unnamed output indicated by a single return annotation, access the output using PipelineTask.output
. This the case for the components square
, add
, and square_root
, which each have one unnamed output.
In the absence of data exchange, tasks will run in parallel for efficient pipeline executions. This is the case for a_sq_task
and b_sq_task
which do not exchange data.
When tasks exchange data, an execution ordering is established between those tasks. This is to ensure that upstream tasks create their outputs before downstream tasks attempt to consume those outputs. For example, in pythagorean
, the backend will execute a_sq_task
and b_sq_task
before it executes sum_task
. Similarly, it will execute sum_task
before it executes the final task created from the square_root
component.
In some cases, you may wish to establish execution ordering in the absence of data exchange. In these cases, you can call one taskβs .after()
method on another task. For example, while a_sq_task
and b_sq_task
do not exchange data, we can specify a_sq_task
to run before b_sq_task
:
Special input types
There are a few special input values that you can pass to a component within your pipeline definition to give the component access to some metadata about itself. These values can be passed to input parameters typed str
.
There several special values that may be used in this style, including:
dsl.PIPELINE_JOB_NAME_PLACEHOLDER
dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
dsl.PIPELINE_JOB_ID_PLACEHOLDER
dsl.PIPELINE_TASK_NAME_PLACEHOLDER
dsl.PIPELINE_TASK_ID_PLACEHOLDER
dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER
dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER
dsl.PIPELINE_ROOT_PLACEHOLDER
Not yet supported
When executed, the print_env_var
component should print 'hello'
.
Task-level configuration methods can also be chained:
The KFP SDK provides the following task methods for setting task-level configurations:
.add_accelerator_type
.set_accelerator_limit
.set_cpu_limit
.set_memory_limit
.set_env_variable
.set_caching_options
.set_display_name
.set_retry
.ignore_upstream_failure
Not yet supported
Pipelines can themselves be used as components in other pipelines, just as you would use any other single-step component in a pipeline. For example, we could easily recompose the preceding pythagorean
pipeline to use an inner helper pipeline square_and_sum
:
Although a KFP pipeline decoratored with the @dsl.pipeline
decorator looks like a normal Python function, it is actually an expression of pipeline topology and semantics, constructed using the (DSL).
This section covers the first four parts. is covered in the next section.
Also see for information on how to provide pipeline metadata via docstrings.
Like , pipeline inputs and outputs are defined by the parameters and annotations in the pipeline function signature.
Pipeline inputs are declaried via function input parameters/annotations and pipeline outputs are declared via function output annotations. Pipeline outputs will never be declared via pipeline function input parameters, unlike for components that use or .
For more information on how to declare pipeline function inputs and outputs, see .
When you call a component in a pipeline definition, it constructs a instance. You can pass data between tasks using the PipelineTask
βs .output
and .outputs
attributes.
For tasks with multiple outputs or named outputs, access the output using PipelineTask.output['<output-key>']
. Using named output parameters is described in more detail in .
For example, the following print_op
component prints the pipeline job name at component runtime using :
PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER
, PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER
, and PIPELINE_ROOT_PLACEHOLDER
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the .
See the for more information about the data provided by each special input.
The KFP SDK exposes several platform-agnostic task-level configurations via task methods. Platform-agnostic configurations are those that are expected to exhibit similar execution behavior on all KFP-conformant backends, such as the or .
All platform-agnostic task-level configurations are set using methods. Take the following environment variable example:
.ignore_upstream_failure
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the .
See the for more information about these methods.