Kubeflow Pipelines v1 sdk v2 examples
Contents
Kubeflow Pipelines v1 sdk v2 examples#
import kfp
import kfp.dsl as dsl
from kfp.v2.dsl import component
kfp_endpoint = "http://localhost:8080/pipeline"
Building Python Function-based Components#
Getting started with Python function-based components
@component
def add(a: float, b: float) -> float:
return a + b
@dsl.pipeline(
name='addition-pipeline',
description='An example pipeline that performs addition calculations.'
)
def add_pipeline(a: float=1, b: float=7):
first_add_task = add(a, 4)
second_add_task = add(first_add_task.output, b)
arguments = {'a': 7, 'b': 8}
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
add_pipeline,
arguments=arguments,
mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)
Example Python function-based component
from typing import NamedTuple
from kfp.v2.dsl import component, Output, Metrics
@component
def add(a: float, b: float) -> float:
return a + b
@component(packages_to_install=['numpy'])
def my_divmod(dividend: float, divisor: float, metrics: Output[Metrics]) -> NamedTuple(
'MyDivmodOutput', [('quotient', float), ('remainder', float)]):
'''Divides two numbers and calculate the quotient and remainder'''
import numpy as np
# Define a helper function
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
# Export two metrics
metrics.log_metric('quotient', float(quotient))
metrics.log_metric('remainder', float(remainder))
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput',
['quotient', 'remainder'])
return divmod_output(quotient, remainder)
@dsl.pipeline(
name='calculation-pipeline',
description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(a: float=1, b: float=7, c: float=17):
add_task = add(a, 4)
divmod_task = my_divmod(add_task.output, b)
result_task = add(divmod_task.outputs['quotient'], c)
arguments = {'a': 7, 'b': 8}
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
calc_pipeline,
arguments=arguments,
mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)