Kubeflow Pipelines v1 sdk v2 examples#

import kfp
import kfp.dsl as dsl
from kfp.v2.dsl import component
kfp_endpoint = "http://localhost:8080/pipeline"

Building Python Function-based Components#

Getting started with Python function-based components

@component
def add(a: float, b: float) -> float:
    return a + b

@dsl.pipeline(
    name='addition-pipeline',
    description='An example pipeline that performs addition calculations.'
)
def add_pipeline(a: float=1, b: float=7):
    first_add_task = add(a, 4)
    second_add_task = add(first_add_task.output, b)

arguments = {'a': 7, 'b': 8}

kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
    add_pipeline,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)

Example Python function-based component

from typing import NamedTuple
from kfp.v2.dsl import component, Output, Metrics
@component
def add(a: float, b: float) -> float:
    return a + b

@component(packages_to_install=['numpy'])
def my_divmod(dividend: float, divisor: float, metrics: Output[Metrics]) -> NamedTuple(
        'MyDivmodOutput', [('quotient', float), ('remainder', float)]):
    
    '''Divides two numbers and calculate the quotient and remainder'''

    import numpy as np

    # Define a helper function
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    # Export two metrics
    metrics.log_metric('quotient', float(quotient))
    metrics.log_metric('remainder', float(remainder))

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput',
        ['quotient', 'remainder'])
    
    return divmod_output(quotient, remainder)

@dsl.pipeline(
   name='calculation-pipeline',
   description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(a: float=1, b: float=7, c: float=17):
    add_task = add(a, 4)
    divmod_task = my_divmod(add_task.output, b)
    result_task = add(divmod_task.outputs['quotient'], c)

arguments = {'a': 7, 'b': 8}

kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
    calc_pipeline,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)