# Pipelines

While components have three authoring approaches, pipelines have one authoring approach: they are defined with a pipeline function decorated with the [`@dsl.pipeline`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.pipeline) decorator. Take the following pipeline, `pythagorean`, which implements the Pythagorean theorem as a pipeline via simple arithmetic components:

```python
from kfp import dsl

@dsl.component
def square(x: float) -> float:
    return x ** 2

@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    sum_task = add(x=a_sq_task.output, y=b_sq_task.output)
    return square_root(x=sum_task.output).output
```

Although a KFP pipeline decoratored with the `@dsl.pipeline` decorator looks like a normal Python function, it is actually an expression of pipeline topology and [control flow](https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/control-flow) semantics, constructed using the [KFP domain-specific language](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html) (DSL).

A pipeline definition has four parts:

1. The pipeline decorator
2. Inputs and outputs declared in the function signature
3. Data passing and task dependencies
4. Task configurations
5. Pipeline control flow

This section covers the first four parts. [Control flow](https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/control-flow) is covered in the next section.

#### The pipeline decorator <a href="#the-pipeline-decorator" id="the-pipeline-decorator"></a>

KFP pipelines are defined inside functions decorated with the `@dsl.pipeline` decorator. The decorator takes three optional arguments:

* `name` is the name of your pipeline. If not provided, the name defaults to a sanitized version of the pipeline function name.
* `description` is a description of the pipeline.
* `pipeline_root` is the root path of the remote storage destination within which the tasks in your pipeline will create outputs. `pipeline_root` may also be set or overridden by pipeline submission clients.
* `display_name` is a human-readable for your pipeline.

You can modify the definition of `pythagorean` to use these arguments:

```python
@dsl.pipeline(name='pythagorean-theorem-pipeline',
              description='Solve for the length of a hypotenuse of a triangle with sides length `a` and `b`.',
              pipeline_root='gs://my-pipelines-bucket',
              display_name='Pythagorean pipeline.')
def pythagorean(a: float, b: float) -> float:
    ...
```

Also see [Additional Functionality: Component docstring format](https://www.kubeflow.org/docs/components/pipelines/v2/components/additional-functionality#component-docstring-format) for information on how to provide pipeline metadata via docstrings.

#### Pipeline inputs and outputs <a href="#pipeline-inputs-and-outputs" id="pipeline-inputs-and-outputs"></a>

Like [components](https://www.kubeflow.org/docs/components/pipelines/v2/components), pipeline inputs and outputs are defined by the parameters and annotations in the pipeline function signature.

In the preceding example, `pythagorean` accepts inputs `a` and `b`, each typed `float`, and creates one `float` output.

Pipeline inputs are declaried via function input parameters/annotations and pipeline outputs are declared via function output annotations. Pipeline outputs will *never be declared via pipeline function input parameters*, unlike for components that use [output artifacts](https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts#using-output-artifacts) or [Container Components that use `dsl.OutputPath`](https://www.kubeflow.org/docs/components/pipelines/v2/components/container-components#create-component-outputs).

For more information on how to declare pipeline function inputs and outputs, see [Data Types](https://www.kubeflow.org/docs/components/pipelines/v2/data-types).

#### Data passing and task dependencies <a href="#data-passing-and-task-dependencies" id="data-passing-and-task-dependencies"></a>

When you call a component in a pipeline definition, it constructs a [`PipelineTask`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.PipelineTask) instance. You can pass data between tasks using the `PipelineTask`’s `.output` and `.outputs` attributes.

For a task with a single unnamed output indicated by a single return annotation, access the output using `PipelineTask.output`. This the case for the components `square`, `add`, and `square_root`, which each have one unnamed output.

For tasks with multiple outputs or named outputs, access the output using `PipelineTask.output['<output-key>']`. Using named output parameters is described in more detail in [Data Types: Parameters](https://www.kubeflow.org/docs/components/pipelines/v2/data-types/parameters#multiple-output-parameters).

In the absence of data exchange, tasks will run in parallel for efficient pipeline executions. This is the case for `a_sq_task` and `b_sq_task` which do not exchange data.

When tasks exchange data, an execution ordering is established between those tasks. This is to ensure that upstream tasks create their outputs before downstream tasks attempt to consume those outputs. For example, in `pythagorean`, the backend will execute `a_sq_task` and `b_sq_task` before it executes `sum_task`. Similarly, it will execute `sum_task` before it executes the final task created from the `square_root` component.

In some cases, you may wish to establish execution ordering in the absence of data exchange. In these cases, you can call one task’s `.after()` method on another task. For example, while `a_sq_task` and `b_sq_task` do not exchange data, we can specify `a_sq_task` to run before `b_sq_task`:

```python
@dsl.pipeline
def pythagorean(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    b_sq_task.after(a_sq_task)
    ...
```

**Special input types**

There are a few special input values that you can pass to a component within your pipeline definition to give the component access to some metadata about itself. These values can be passed to input parameters typed `str`.

For example, the following `print_op` component prints the pipeline job name at component runtime using [`dsl.PIPELINE_JOB_NAME_PLACEHOLDER`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.PIPELINE_JOB_NAME_PLACEHOLDER):

```python
from kfp import dsl

@dsl.pipeline
def my_pipeline():
    print_op(text=dsl.PIPELINE_JOB_NAME_PLACEHOLDER)
```

There several special values that may be used in this style, including:

* `dsl.PIPELINE_JOB_NAME_PLACEHOLDER`
* `dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER`
* `dsl.PIPELINE_JOB_ID_PLACEHOLDER`
* `dsl.PIPELINE_TASK_NAME_PLACEHOLDER`
* `dsl.PIPELINE_TASK_ID_PLACEHOLDER`
* `dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER`
* `dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER`
* `dsl.PIPELINE_ROOT_PLACEHOLDER`

**Not yet supported**

`PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER`, `PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER`, and `PIPELINE_ROOT_PLACEHOLDER` is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the [GitHub issue](https://github.com/kubeflow/pipelines/issues/6155).

See the [KFP SDK DSL reference docs](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html) for more information about the data provided by each special input.

#### Task configurations <a href="#task-configurations" id="task-configurations"></a>

The KFP SDK exposes several platform-agnostic task-level configurations via task methods. Platform-agnostic configurations are those that are expected to exhibit similar execution behavior on all KFP-conformant backends, such as the [open source KFP backend](https://www.kubeflow.org/docs/components/pipelines/v2/installation/) or [Google Cloud Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction).

All platform-agnostic task-level configurations are set using [`PipelineTask`](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.PipelineTask) methods. Take the following environment variable example:

```python
from kfp import dsl

@dsl.component
def print_env_var():
    import os
    print(os.environ.get('MY_ENV_VAR'))

@dsl.pipeline()
def my_pipeline():
    task = print_env_var()
    task.set_env_variable('MY_ENV_VAR', 'hello')
```

When executed, the `print_env_var` component should print `'hello'`.

Task-level configuration methods can also be chained:

```python
print_env_var().set_env_variable('MY_ENV_VAR', 'hello').set_env_variable('OTHER_VAR', 'world')
```

The KFP SDK provides the following task methods for setting task-level configurations:

* `.add_accelerator_type`
* `.set_accelerator_limit`
* `.set_cpu_limit`
* `.set_memory_limit`
* `.set_env_variable`
* `.set_caching_options`
* `.set_display_name`
* `.set_retry`
* `.ignore_upstream_failure`

**Not yet supported**

`.ignore_upstream_failure` is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the [GitHub issue](https://github.com/kubeflow/pipelines/issues/9459).

See the [`PipelineTask` reference documentation](https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.PipelineTask) for more information about these methods.

#### Pipelines as components <a href="#pipelines-as-components" id="pipelines-as-components"></a>

Pipelines can themselves be used as components in other pipelines, just as you would use any other single-step component in a pipeline. For example, we could easily recompose the preceding `pythagorean` pipeline to use an inner helper pipeline `square_and_sum`:

```python
@dsl.pipeline
def square_and_sum(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    return add(x=a_sq_task.output, y=b_sq_task.output).output


@dsl.pipeline
def pythagorean(a: float = 1.2, b: float = 1.2) -> float:
    sq_and_sum_task = square_and_sum(a=a, b=b)
    return square_root(x=sq_and_sum_task.output).output
```
