Intro to Workflows on GCP

An introduction to Workflows and code examples for getting started.

ยท

8 min read

Introduction ๐Ÿ‘‹

Workflows is a new Google Cloud product that enables developers to create serverless workflows by linking tasks together to be executed in order. Workflows can orchestrate tasks in the form of Google Cloud products, such as Cloud Run, or call external services. This differs from the orchestration service Cloud Composer in that flows are defined in YAML or JSON and the use cases go beyond data processing.

This post is an introduction to the service, how it works and provides short examples on how to perform certain actions. In the next blog post I will use those operations to create a workflow that calls the Vision API on images stored in a Google Cloud Storage bucket.

Workflow Concepts ๐Ÿ‘ฉโ€๐Ÿซ

Why use it? โ“

If you're looking for a serverless product to run flows, such as events processing or chaining together API calls, then Workflows is for you.

It has many features to leverage including:

  • Connect to GCP APIs or external services
  • Retry and error handling between steps
  • Automatic JSON parsing from API responses and passing variables between steps
  • Conditional step execution
  • Don't repeat yourself - use Subworkflows for reusing steps
  • Safe interactions and authentication using service accounts and use keys from Secret Manager
  • Only pay per 1000 steps executed (Google Cloud Free Tier included!)
  • Connectors for various GCP products for easier integration (coming soon)

As this is serverless and managed, all the compute is provisioned automatically, leaving more time for development.

What about Cloud Composer? ๐ŸŽผ

Cloud Composer is a managed service for running Airflow jobs. Airflow is an open source library for writing and managing data pipelines written in Python. Airflow jobs are usually analytics workloads such as data engineering pipelines, automating batch jobs, machine learning workflows, and so on. These jobs create a Directed Acyclic Graph (DAG) that describes task execution order and dependencies. Airflow jobs can use other Python libraries and has supported integration with various services ready to be used. Workflows is different in that tasks are written in YAML/JSON and you're restricted to the Workflows syntax.

The other major difference is Cloud Composer is not serverless. It uses a GKE cluster, Cloud SQL, and Google Cloud Storage bucket. All of these together run Airflow, provide a web interface for monitoring and interactivity, and store DAGs files, logs, plugins and so on. This means there are some operational tasks a user needs to manage.

What about Cloud Functions? 9๏ธโƒฃ

Cloud Functions and Workflows could be used for similar processes, but the primary reason to use Workflows is if your execution will take longer than nine minutes or you need to execute many tasks in order. Workflows has state information on each task so they can be written to handle errors and execute tasks based on conditions. Additionally, it's resilient to system failures so it will resume execution to the last check-pointed state if something goes wrong.

Use Cases ๐Ÿ–ฅ๏ธ

A few possible use cases for Workflows:

  • Generating an invoice after a purchase transaction
  • Processing uploaded files based on file format
  • IT infrastructure automation (e.g. shutting down VMs after office hours)

Making a Workflow ๐Ÿ‘ฉโ€๐Ÿ’ป

File ๐Ÿ’พ

Create a workflow by defining the series of steps in a YAML or JSON file. Each file can only contain a single workflow and it must have at least one step. The code examples in this post are in YAML.

Steps ๐Ÿชœ

- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${CurrentDateTime.body.dayOfTheWeek}
    result: WikiResult
- returnOutput:
    return: ${WikiResult.body[1]}

Every step starts with - [yourStepName]: and contained in that step are the parameters that define behaviours or actions expected in that step. In the above example we are calling the Wikipedia API using the call parameter and providing request arguments to go with it. The final step returns the body part of the JSON response.

Workflows supports the following type of steps:

  • Invoking an HTTP endpoint
  • Assigning a variable
  • Sleeping
  • Creating the logic for conditional jumps
  • Returning a value

Key Actions ๐Ÿ”‘

1) Assign variables by using the assign step

- assignVariables:
   assign:
      - name: 'Fumi'
      - pi: 3.14

2) Leverage Expressions for assigning values to variables, conditional jumps, and more

Expressions are defined with a $ and curly brackets with the expression inside. To use a variable in another step use ${variable_name}. The below example shows how to call a function in the expression and sets the value to the project variable.

- initVariables:
    assign:
      - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}

3) Make an HTTP call or call to Google APIs using call

This example makes a call to the Secret Manager to get a key for authenticating to an API.

- getSecret:
      call: googleapis.secretmanager.v1.projects.secrets.versions.access
      args:
        name: ${"projects/" + project + "/secrets/" + secret + "/versions/" + string(version)}
      result: secretResult

This example calls the Google News API with the API Key from the previous step. There are a few fields to use when making calls. See more information here

- getRecentNews:
    call: http.get
    args:
        url: http://newsapi.org/v2/top-headlines?
        query:
            category: technology
            from: ${CurrentDateTime.body}
            sortBy: popularity&
            language: en
            apiKey: ${secretValue}
    result: newsResult

4) Arrays and Dictionaries

To create arrays use square brackets with an assign field.

- createArray:
    assign:
        - num_array: ["zero","one","two"]

To create dictionaries define your dictionary name like - myDict then provide key value pairs. To have a nested dictionary ensure to add space after the key so it's clearly visible that it's nested.

- createDictionary:
    assign:
      - myDictionary:
          FirstName: "John"
          LastName: "Smith"
          Age: 26
          Address:
              Street: "Flower Road 12"
              City: "Superville"
              Country: "Atlantis"

5) Error Handling with Try and Except

There are three error handling features that can be leveraged in Workflows:

  • Raising exceptions
  • Catching and handling HTTP request errors
  • Retrying a failed step

The first two are covered in the following example. This code runs a http.get call on the Cloud Storage API for listing objects in a GCS bucket at ${"https://storage.googleapis.com/storage/v1/b/"+bucket+"/o"} where bucket is the name of the bucket to query. The result from this API call is stored in the variable bucketMetaData. The step is contained within a try block and the except section stores the exception in an error dictionary variable, exception_dict. Based on the contents of the error we can raise custom messages.

- readBucketMetaData:
    try:
      call: http.get
      args:
        url: ${"https://storage.googleapis.com/storage/v1/b/"+bucket+"/o"}
        auth:
            type: OAuth2
        query:
          prefix: ${prefix}
          fields: items/name, items/bucket, items/contentType, items/timeCreated
      result: bucketMetaData
    except:
      as: exception_dict
      steps:
        - handleGCSError:
            switch:
              - condition: ${exception_dict.code == 404}
                raise: "Bucket not found"
              - condition: ${exception_dict.code == 403}
                raise: "Error authenticating to GCP"
        - unhandledException:
            raise: ${exception_dict}

The try block can contain multiple steps and they will share the same except block for handling errors.

To retry steps use the retry and make a block with settings describing number of retry attempts, backoff model and length of delay between each attempt. For more information see here.

6) Using Jumps with Conditions for Controlling Execution Flow

We can write conditional statements to control flow of execution. By creating a switch block and using the field condition to specify a condition that runs the step specified under next if the condition is True.

Continuing from the previous code snippet this takes the last file in the bucket and checks if the content type is either JPEG or PNG. It will return a string based on the output at the step specified under the next field.

- assignNumOfItems:
    assign:
      - resLength: ${len(bucketMetaData.body.items)}
- imageTypeCondition:
    switch:
      - condition: ${bucketMetaData.body.items[resLength-1].contentType == "image/jpeg"}
        next: contentJPEG
      - condition: ${bucketMetaData.body.items[resLength-1].contentType == "image/png"}
        next: contentPNG
    next: contentOTHER
- contentJPEG:
    return: ${"Image "+ bucketMetaData.body.items[resLength-1].name + " is a JPEG"}
- contentPNG:
    return: ${"Image "+ bucketMetaData.body.items[resLength-1].name + " is a PNG!!!"}
- contentOTHER:
    return: ${"Image "+ bucketMetaData.body.items[resLength-1].name + " is not a JPEG or PNG"}

Every Workflow created in GCP produces a visual representation of the steps and the steps that proceed them. This is automatically generated in the console.

image.png

7) Writing Logs

When you run your Workflows the final execution output is whatever is defined in the last step using the return field. By default, Workflows doesn't log information about execution and the progress of each step. You need to write steps that call sys.log and create custom logs to be sent to Cloud Logging.

The below example creates a log and prints the contents of a dictionary variable, which must call the json.encode_to_string()

- logmyDict:
     call: sys.log
     args:
         text: ${"The variable myDict = " + json.encode_to_string(myDict)}
         severity: INFO

This is a small subset of actions available in Workflows. For the entire list and full Syntax guide read here.

Deploying Workflows ๐Ÿ“ฉ

Currently, there are two ways of deploying workflows. Either in the console or by running a gcloud command. To deploy your workflow provide a name and the file with the workflow defined. Add a location if you don't want the default location us-central1. Supplying a service account is optional as it will use the default compute engine service account.

gcloud workflows deploy my_first_workflow \
--source=my_first_workflow.YAML \
[--location=europe-west4]
[--service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM]

Modify a workflow by changing its source code, description, labels or service account by running the same command as above with new settings. To list all workflows run gcloud workflows list command.

Executing Workflows ๐ŸŽฌ

To run a Workflow you have the same options as before as well as sending a REST API request.

To run gcloud use gcloud workflows run [workflow_name] [--data=DATA] where runtime arguments in JSON can be used in the workflow

Send a POST request to workflowexecutions.googleapis.com along with the desired project ID and workflow name.

curl \
--request POST \
--header "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
--header 'Content-Type: application/json' \
"https://workflowexecutions.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/workflows/WORKFLOW_NAME/executions"

To send a JSON formatted string of arguments for the Workflow execute use --data '{"argument":"{\"PARAMETER\":\"VALUE\"}"}'

Resources ๐Ÿ“š

Thank you for reading ๐Ÿ‘‹

Thank you for reading and I hope this helps demystify Workflows. I will be publishing another post with an example workflow that covers all of the previous points and calls the Vision API.

I would love to hear from you and any comments you have!