intro_to_vertex_pipelines.ipynb
Contents
intro_to_vertex_pipelines.ipynb#
Based on:
../machine_learning_in_the_enterprise/solutions/intro_to_vertex_pipelines.ipynb
Kubeflow#
Create components and pipeline
from typing import NamedTuple
from kfp import dsl
from kfp import client
@dsl.component
def product_name(text: str) -> str:
return text
@dsl.component(packages_to_install=["emoji"])
def emoji(text: str) -> NamedTuple('Outputs', [('emoji_text', str),('emoji', str)]):
import emoji
from collections import namedtuple
emoji_text = text
emoji_str = emoji.emojize(f':{emoji_text}:', language="alias")
print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
emoji_output = namedtuple('Outputs', ['emoji_text', 'emoji'])
return emoji_output(emoji_text, emoji_str)
@dsl.component
def build_sentence(product: str, emoji: str, emojitext: str) -> str:
print("We completed the pipeline, hooray!")
end_str = product + " is "
if len(emoji) > 0:
end_str += emoji
else:
end_str += emojitext
return end_str
@dsl.pipeline(name="hello-world", description="An intro pipeline")
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
product_task = product_name(text=text)
emoji_task = emoji(text=emoji_str)
consumer_task = build_sentence(
product=product_task.output,
emoji=emoji_task.outputs['emoji'],
emojitext=emoji_task.outputs['emoji_text'])
Run Pipeline
There is an issue where unicode characteres cant’t be handled my the Kubeflow Pipelines operators like \u2728
✨, at least for version 1.8.5
. The issue was reported here. If your pipeline get stuck or failed, that’s the issue.
endpoint = "http://localhost:8080/pipeline"
client.Client(host=endpoint).create_run_from_pipeline_func(
intro_pipeline, arguments={})
v1 compilation
kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
pipeline_func=intro_pipeline,
package_path='intro_pipeline.yaml')
Vertex AI Pipeline#
gcloud services enable compute.googleapis.com \
containerregistry.googleapis.com \
aiplatform.googleapis.com \
cloudbuild.googleapis.com \
cloudfunctions.googleapis.com
Get Project ID
import os
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
print(PROJECT_ID)
Define Bucket and variables
from google.cloud import storage
BUCKET_NAME = f'{PROJECT_ID}-vertex'
REGION = 'us-central1'
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
if not bucket.exists():
storage_client.create_bucket(BUCKET_NAME, location=REGION)
Compile the pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)
Run pipeline
import google.cloud.aiplatform as aip
aip.init()
job = aip.PipelineJob(
display_name="intro pipeline job",
template_path="intro_pipeline_job.json"
).run()