Blog

Using Kubeflow for Financial Time Series: Part III — Kubeflow Pipelines

29 May 2019, 14:41

This blog post is part of a series of blog posts on Kubeflow. This post is a follow-up on the first and second part. In these first two parts we explored how Kubeflow’s main components can facilitate tasks of a machine learning engineer, all on a single platform. In this third part we will explore Kubeflow Pipelines (KFP), which were introduced since Kubeflow v0.4.

Please note that it’s not strictly necessary to read the first and second part before reading this third part on Kubeflow pipelines.

The image below illustrates a Kubeflow pipeline graph:

Why

Often a machine learning workflow exists of multiple steps, for example: getting the data, preprocessing the data, training a model, serving new requests, etc. In the beginning of a project you might do these steps manually but as the steps become more mature, you might want to start automating this for multiple reasons (productivity, reproducibility, etc). Kubeflow Pipelines introduces an elegant way of solving this automation problem. Basically, every step in the workflow is containerized and Kubeflow Pipelines chains these together. Kubeflow pipelines comes with a user interface for following up the progress and checking your results.

Setup

You can use a Google Cloud Shell to follow the steps outlined below. Alternatively you can work from your local environment, but in that case you will have to make sure to install the requirements:

Let’s first clone the repository:

$ git init
$ git clone https://github.com/kubeflow/examples.git
$ cd examples/

Deploy Kubeflow Pipelines to GKE

In this blog post, we choose to deploy only Kubeflow Pipelines to a Kubernetes cluster instead of deploying Kubeflow with all the components.

$ CLUSTERNAME=<your-cluster-name>
$ ZONE=europe-west1-b
$ gcloud beta container clusters create $CLUSTERNAME --enable-autoupgrade --zone $ZONE --scopes cloud-platform --enable-cloud-logging --enable-cloud-monitoring --machine-type n1-standard-1  --num-nodes 2

Once the cluster is created, we will need to set up cluster role-binding so we are authorized to deploy a bootstrapper that will launch the necessary Kubeflow Pipelines components.

$ kubectl create clusterrolebinding ml-pipeline-admin-binding --clusterrole=cluster-admin --user=$(gcloud config get-value account)

Subsequently, we can deploy the bootstrapper:

$ PIPELINE_VERSION=0.1.8
$ kubectl create -f https://storage.googleapis.com/ml-pipeline/release/$PIPELINE_VERSION/bootstrapper.yaml

When checking the Kubernetes pods via kubectl get pods we get:

Once this container is up, it will start deploying the Kubeflow Pipelines components onto the Kubernetes cluster in the kubeflow namespace.

If you want to inspect the progress, you can simply run kubectl get pods -n kubeflow :

Next, we need to create a Google Cloud Storage bucket that will store the data and models.

$ BUCKET_NAME=<your-bucket-name>
$ gsutil mb gs://$BUCKET_NAME/

Containerize your pipeline steps

In the previous series of blogposts on this financial time series use case, we combined the preprocessing and training in a single script to illustrate the TFJob component of Kubeflow. In practice, most often the preprocessing and training step will separated and they will need to run sequentially each time. In this way, we decouple the preprocessing from the training and can iterate faster over different experiments. Kubeflow pipelines offers an easy way of chaining these steps together and we will illustrate that. As you can see, the script run_preprocess_and_train.py in tensorflow_model/ is using the two scripts run_preprocess.py and run_train.py underlying. The idea here is that these two steps will be containerized and chained together by Kubeflow pipelines.

Since these two steps are in the same repository, we can create a single container from the repository and use the same image for the two steps. Let’s build the container:

$ cd financial_time_series/tensorflow_model/
$ PROJECT_ID=<your-gcp-project-id>
$ IMAGE_NAME=pipeline-test
$ gcloud builds submit --tag gcr.io/$PROJECT_ID/$IMAGE_NAME:v1 .

Now we need to update our pipeline code ml_pipeline.py so they point to the newly created image. Also update the bucket value parameter to the your bucket name (this is not strictly necessary as you can still set this parameter in the user interface later on).

Build the pipeline

If you inspect the ml_pipeline.py , it can be seen that defining the pipeline steps are straight forward:

import kfp.dsl as dslclass Preprocess(dsl.ContainerOp):def __init__(self, name, bucket, cutoff_year):
    super(Preprocess, self).__init__(
      name=name,
      # image needs to be a compile-time string
      image='gcr.io/<project>/<image-name>/cpu:v1',
      command=['python3', 'run_preprocess.py'],
      arguments=[
        '--bucket', bucket,
        '--cutoff_year', cutoff_year,
        '--kfp'
      ],
      file_outputs={'blob-path': '/blob_path.txt'}
    )

After defining the individual steps, you define the actual pipeline operations:

@dsl.pipeline(
  name='financial time series',
  description='Train Financial Time Series'
)
def train_and_deploy(
   bucket=dsl.PipelineParam('bucket', value='<bucket>'),
   cutoff_year=dsl.PipelineParam('cutoff-year', value='2010'),
   version=dsl.PipelineParam('version', value='4'),
   model=dsl.PipelineParam('model', value='DeepModel')
):
  """Pipeline to train financial time series model"""
  preprocess_op = Preprocess('preprocess', bucket, cutoff_year)
  train_op = Train('train and deploy', preprocess_op.output,     version, bucket, model)

One important aspect to mention is the way containers can pass parameters to each other. As you can see from the code snippets above, the preprocessing step outputs a parameter ‘blob-path’ and the value is the directory where the value of the parameter was stored by the container.

If we inspect the run_preprocess.py , we indeed notice that we need to write a file to the local directory in order to pass this parameter (! this does not happen automatically):

if args.kfp:
  with open("/blob_path.txt", "w") as output_file:
    output_file.write(file_path)

Note that the local file is only written when the ‘kfp’ argument is set, this is to avoid these local files when running the script locally.

Compile the pipeline

KFP asks us to compile our pipeline Python3 file into a domain-specific-language. We do that with a tool called dsl-compile that comes with the Python3 SDK. So, first install that SDK:

$ pip3 install python-dateutil https://storage.googleapis.com/ml-pipeline/release/0.1.2/kfp.tar.gz --upgrade

To actually compile the pipeline, you can run:

 python3 ml_pipeline.py

This will generate an output file that contains the compiled pipeline ml_pipeline.py.tar.gz . If you are using Google Cloud Shell, you will need to download the ml_pipeline.py.tar.gz so you can upload it later in the UI:

Run the pipeline

To run the pipeline, we need to need to access to Kubeflow Pipelines UI and upload this file. We will use port-forwarding to connect to the Kubeflow Pipelines UI.

$ NAMESPACE=kubeflow
$ kubectl port-forward -n ${NAMESPACE} $(kubectl get pods -n ${NAMESPACE} --selector=service=ambassador -o jsonpath='{.items[0].metadata.name}') 8085:80

The Kubeflow Pipelines UI should now be available on http://localhost:8085/pipeline/. If you are using Google Cloud Shell, you will need to change the preview port to 8085 and adjust the redirected url https://8085-dot-…-dot-…-dot-devshell.appspot.com/edit/edit.html to https://8085-dot-…-dot-…-dot-devshell.appspot.com/pipeline.

Next, we select to upload a pipeline via the button at the top right corner.

After uploading the file, you can click on the pipeline and it will show a graph that visualizes the different pipeline steps.

Now we will create a new run and specify our run parameters.

Note that the type of machine learning model is parametrized in our container so we can pass it here as a run parameter. Also we can play with the data cutoff-year as a runtime parameter. Basically every container argument can be set a pipeline parameter which makes it very easy to rerun workflows with different parameters.

After the pipeline is started, Kubeflow pipelines will orchestrate the steps for you and you can follow the progress (including container logs) in the UI.

After a few minutes, the pipeline should be completed. Potentially you could create more runs and play with the type of model and data cutoff year to see the impact on the performance.

Cleaning up

To clean up the cluster, we will simply delete it

$ gcloud container clusters delete $CLUSTERNAME --zone $ZONE

More about Kubeflow Pipelines (KFP)

If you want to read more about Kubeflow Pipelines, definitely have a look on the Kubeflow website.