intro_to_vertex_pipelines.ipynb#

Based on:

Kubeflow#

Create components and pipeline

from typing import NamedTuple
from kfp import dsl
from kfp import client

@dsl.component
def product_name(text: str) -> str:
    return text

@dsl.component(packages_to_install=["emoji"])
def emoji(text: str) -> NamedTuple('Outputs', [('emoji_text', str),('emoji', str)]):

    import emoji
    from collections import namedtuple
    
    emoji_text = text
    emoji_str = emoji.emojize(f':{emoji_text}:', language="alias") 
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))

    emoji_output = namedtuple('Outputs', ['emoji_text', 'emoji'])
    return emoji_output(emoji_text, emoji_str)

@dsl.component
def build_sentence(product: str, emoji: str, emojitext: str) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return end_str

@dsl.pipeline(name="hello-world", description="An intro pipeline")
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text=text)
    emoji_task = emoji(text=emoji_str)
    consumer_task = build_sentence(
        product=product_task.output,
        emoji=emoji_task.outputs['emoji'],
        emojitext=emoji_task.outputs['emoji_text'])

Run Pipeline

There is an issue where unicode characteres cant’t be handled my the Kubeflow Pipelines operators like \u2728 ✨, at least for version 1.8.5. The issue was reported here. If your pipeline get stuck or failed, that’s the issue.

endpoint = "http://localhost:8080/pipeline"

client.Client(host=endpoint).create_run_from_pipeline_func(
    intro_pipeline, arguments={})

v1 compilation

kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
    pipeline_func=intro_pipeline,
    package_path='intro_pipeline.yaml')

Vertex AI Pipeline#

gcloud services enable compute.googleapis.com \
                       containerregistry.googleapis.com \
                       aiplatform.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Get Project ID

import os
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
print(PROJECT_ID)

Define Bucket and variables

from google.cloud import storage

BUCKET_NAME = f'{PROJECT_ID}-vertex'
REGION = 'us-central1'

storage_client = storage.Client()

bucket = storage_client.bucket(BUCKET_NAME)
if not bucket.exists():
    storage_client.create_bucket(BUCKET_NAME, location=REGION)

Compile the pipeline

from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Run pipeline

import google.cloud.aiplatform as aip

aip.init()

job = aip.PipelineJob(
    display_name="intro pipeline job",
    template_path="intro_pipeline_job.json"
).run()