Intro to Workflows on GCP
An introduction to Workflows and code examples for getting started.
Introduction ๐
Workflows is a new Google Cloud product that enables developers to create serverless workflows by linking tasks together to be executed in order. Workflows can orchestrate tasks in the form of Google Cloud products, such as Cloud Run, or call external services. This differs from the orchestration service Cloud Composer in that flows are defined in YAML or JSON and the use cases go beyond data processing.
This post is an introduction to the service, how it works and provides short examples on how to perform certain actions. In the next blog post I will use those operations to create a workflow that calls the Vision API on images stored in a Google Cloud Storage bucket.
Workflow Concepts ๐ฉโ๐ซ
Why use it? โ
If you're looking for a serverless product to run flows, such as events processing or chaining together API calls, then Workflows is for you.
It has many features to leverage including:
- Connect to GCP APIs or external services
- Retry and error handling between steps
- Automatic JSON parsing from API responses and passing variables between steps
- Conditional step execution
- Don't repeat yourself - use Subworkflows for reusing steps
- Safe interactions and authentication using service accounts and use keys from Secret Manager
- Only pay per 1000 steps executed (Google Cloud Free Tier included!)
- Connectors for various GCP products for easier integration (coming soon)
As this is serverless and managed, all the compute is provisioned automatically, leaving more time for development.
What about Cloud Composer? ๐ผ
Cloud Composer is a managed service for running Airflow jobs. Airflow is an open source library for writing and managing data pipelines written in Python. Airflow jobs are usually analytics workloads such as data engineering pipelines, automating batch jobs, machine learning workflows, and so on. These jobs create a Directed Acyclic Graph (DAG) that describes task execution order and dependencies. Airflow jobs can use other Python libraries and has supported integration with various services ready to be used. Workflows is different in that tasks are written in YAML/JSON and you're restricted to the Workflows syntax.
The other major difference is Cloud Composer is not serverless. It uses a GKE cluster, Cloud SQL, and Google Cloud Storage bucket. All of these together run Airflow, provide a web interface for monitoring and interactivity, and store DAGs files, logs, plugins and so on. This means there are some operational tasks a user needs to manage.
What about Cloud Functions? 9๏ธโฃ
Cloud Functions and Workflows could be used for similar processes, but the primary reason to use Workflows is if your execution will take longer than nine minutes or you need to execute many tasks in order. Workflows has state information on each task so they can be written to handle errors and execute tasks based on conditions. Additionally, it's resilient to system failures so it will resume execution to the last check-pointed state if something goes wrong.
Use Cases ๐ฅ๏ธ
A few possible use cases for Workflows:
- Generating an invoice after a purchase transaction
- Processing uploaded files based on file format
- IT infrastructure automation (e.g. shutting down VMs after office hours)
Making a Workflow ๐ฉโ๐ป
File ๐พ
Create a workflow by defining the series of steps in a YAML or JSON file. Each file can only contain a single workflow and it must have at least one step. The code examples in this post are in YAML.
Steps ๐ช
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
Every step starts with - [yourStepName]:
and contained in that step are the parameters that define behaviours or actions expected in that step. In the above example we are calling the Wikipedia API using the call
parameter and providing request arguments to go with it. The final step returns the body part of the JSON response.
Workflows supports the following type of steps:
- Invoking an HTTP endpoint
- Assigning a variable
- Sleeping
- Creating the logic for conditional jumps
- Returning a value
Key Actions ๐
1) Assign variables by using the assign
step
- assignVariables:
assign:
- name: 'Fumi'
- pi: 3.14
2) Leverage Expressions for assigning values to variables, conditional jumps, and more
Expressions are defined with a $
and curly brackets with the expression inside. To use a variable in another step use ${variable_name}
. The below example shows how to call a function in the expression and sets the value to the project
variable.
- initVariables:
assign:
- project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
3) Make an HTTP call or call to Google APIs using call
This example makes a call to the Secret Manager to get a key for authenticating to an API.
- getSecret:
call: googleapis.secretmanager.v1.projects.secrets.versions.access
args:
name: ${"projects/" + project + "/secrets/" + secret + "/versions/" + string(version)}
result: secretResult
This example calls the Google News API with the API Key from the previous step. There are a few fields to use when making calls. See more information here
- getRecentNews:
call: http.get
args:
url: http://newsapi.org/v2/top-headlines?
query:
category: technology
from: ${CurrentDateTime.body}
sortBy: popularity&
language: en
apiKey: ${secretValue}
result: newsResult
4) Arrays and Dictionaries
To create arrays use square brackets with an assign
field.
- createArray:
assign:
- num_array: ["zero","one","two"]
To create dictionaries define your dictionary name like - myDict
then provide key value pairs. To have a nested dictionary ensure to add space after the key so it's clearly visible that it's nested.
- createDictionary:
assign:
- myDictionary:
FirstName: "John"
LastName: "Smith"
Age: 26
Address:
Street: "Flower Road 12"
City: "Superville"
Country: "Atlantis"
5) Error Handling with Try and Except
There are three error handling features that can be leveraged in Workflows:
- Raising exceptions
- Catching and handling HTTP request errors
- Retrying a failed step
The first two are covered in the following example. This code runs a http.get
call on the Cloud Storage API for listing objects in a GCS bucket at ${"https://storage.googleapis.com/storage/v1/b/"+bucket+"/o"}
where bucket is the name of the bucket to query. The result from this API call is stored in the variable bucketMetaData
. The step is contained within a try
block and the except
section stores the exception in an error dictionary variable, exception_dict
. Based on the contents of the error we can raise
custom messages.
- readBucketMetaData:
try:
call: http.get
args:
url: ${"https://storage.googleapis.com/storage/v1/b/"+bucket+"/o"}
auth:
type: OAuth2
query:
prefix: ${prefix}
fields: items/name, items/bucket, items/contentType, items/timeCreated
result: bucketMetaData
except:
as: exception_dict
steps:
- handleGCSError:
switch:
- condition: ${exception_dict.code == 404}
raise: "Bucket not found"
- condition: ${exception_dict.code == 403}
raise: "Error authenticating to GCP"
- unhandledException:
raise: ${exception_dict}
The try block can contain multiple steps and they will share the same except block for handling errors.
To retry steps use the retry
and make a block with settings describing number of retry attempts, backoff model and length of delay between each attempt. For more information see here.
6) Using Jumps with Conditions for Controlling Execution Flow
We can write conditional statements to control flow of execution. By creating a switch
block and using the field condition
to specify a condition that runs the step specified under next
if the condition is True
.
Continuing from the previous code snippet this takes the last file in the bucket and checks if the content type is either JPEG or PNG. It will return a string based on the output at the step specified under the next
field.
- assignNumOfItems:
assign:
- resLength: ${len(bucketMetaData.body.items)}
- imageTypeCondition:
switch:
- condition: ${bucketMetaData.body.items[resLength-1].contentType == "image/jpeg"}
next: contentJPEG
- condition: ${bucketMetaData.body.items[resLength-1].contentType == "image/png"}
next: contentPNG
next: contentOTHER
- contentJPEG:
return: ${"Image "+ bucketMetaData.body.items[resLength-1].name + " is a JPEG"}
- contentPNG:
return: ${"Image "+ bucketMetaData.body.items[resLength-1].name + " is a PNG!!!"}
- contentOTHER:
return: ${"Image "+ bucketMetaData.body.items[resLength-1].name + " is not a JPEG or PNG"}
Every Workflow created in GCP produces a visual representation of the steps and the steps that proceed them. This is automatically generated in the console.
7) Writing Logs
When you run your Workflows the final execution output is whatever is defined in the last step using the return
field. By default, Workflows doesn't log information about execution and the progress of each step. You need to write steps that call sys.log
and create custom logs to be sent to Cloud Logging.
The below example creates a log and prints the contents of a dictionary variable, which must call the json.encode_to_string()
- logmyDict:
call: sys.log
args:
text: ${"The variable myDict = " + json.encode_to_string(myDict)}
severity: INFO
This is a small subset of actions available in Workflows. For the entire list and full Syntax guide read here.
Deploying Workflows ๐ฉ
Currently, there are two ways of deploying workflows. Either in the console or by running a gcloud command. To deploy your workflow provide a name and the file with the workflow defined. Add a location if you don't want the default location us-central1
. Supplying a service account is optional as it will use the default compute engine service account.
gcloud workflows deploy my_first_workflow \
--source=my_first_workflow.YAML \
[--location=europe-west4]
[--service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM]
Modify a workflow by changing its source code, description, labels or service account by running the same command as above with new settings. To list all workflows run gcloud workflows list
command.
Executing Workflows ๐ฌ
To run a Workflow you have the same options as before as well as sending a REST API request.
To run gcloud use gcloud workflows run [workflow_name] [--data=DATA]
where runtime arguments in JSON can be used in the workflow
Send a POST request to workflowexecutions.googleapis.com
along with the desired project ID and workflow name.
curl \
--request POST \
--header "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
--header 'Content-Type: application/json' \
"https://workflowexecutions.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/workflows/WORKFLOW_NAME/executions"
To send a JSON formatted string of arguments for the Workflow execute use --data '{"argument":"{\"PARAMETER\":\"VALUE\"}"}'
Resources ๐
Thank you for reading ๐
Thank you for reading and I hope this helps demystify Workflows. I will be publishing another post with an example workflow that covers all of the previous points and calls the Vision API.
I would love to hear from you and any comments you have!