Machine Learning Deployment

Introduction

In this project, we will develop a real-time video analytic pipeline (i.e., machine learning on a video stream). Video feed will be streamed from IP cameras into a Kafka queue, processed by a machine learning model, predictions are imprinted onto the frames, processed frames are queued again into another Kafka topic, and then streamed to web for display. Microservices architecture is adopted, where all code is containerized using Dockers and orchestrated using Kubernetes.

The following tools will be used in this project:

  • Kubernetes
  • Docker
  • Golang
  • Kafka & Zookeeper
  • REST
  • TensorFlow Serving API

In this project, the video analytics will focus on a frame-by-frame object classification task, similar to the ImageNet competition. In fact, we will deploy a TensorFlow Serving saved model which is originally trained on ImageNet database.

The complete system design and data flow of this project is illustrated by the following image.

pipeline
Scalable microservices architecture for video analytics pipeline.

Learning Outcome

Find the source code in the repository.

At the end of this project, we should be able to:

  • Write deployment.yml files in Kubernetes and docker-compose.yml files in Docker
  • Run containerized Confluent Zookeeper and Confluent Kafka in Kubernetes
  • Always retrieve the latest message in the Kafka topic-partition queue
  • Use GoCV (Golang client for OpenCV) to stream video and to manipulate images in Golang
  • Use TensorFlow Serving, with REST API, to deploy machine learning models
  • Form MJPEG from JPEG images and broadcast the video to web
  • Recover and restart a goroutine which panicked

Project Structure

The project structure is as follows:

Scalable-Deployment-Kubernetes                     # Main folder
├── goconsumer                                     # Process video frames using machine learning models
│   ├── assets                                     #
│   │   └── imagenetLabels.json                    # Labels for imagenet classes
│   ├── vendor                                     # Dependency files
│   │   ├── confluentkafkago                       #
│   │   │   ├── consumer.go                        # Confluent-Kafka-Go consumer wrapper
│   │   │   └── producer.go                        # Confluent-Kafka-Go producer wrapper
│   │   ├── models                                 #
│   │   │   ├── handler.go                         # Handler for all TensorFlow models
│   │   │   └── imagenet.go                        # Handler for imagenet TensorFlow model
│   │   └── github.com                             #
│   │       └── pkg                                #
│   │           └── profile                        # Golang code profiler (cpu and memory)
│   ├── deployment.yml                             # Kubernetes deployment
│   ├── docker-compose.yml                         # Docker deployment
│   ├── dockerfile                                 # To create Docker container
│   ├── main.go                                    #
│   └── message.go                                 # Processes Kafka messages by calling the appropriate TensorFlow models.handler function
├── goproducer                                     # Streams video from IP camera into Kafka
│   ├── vendor                                     # Dependency files
│   │   └── confluentkafkago                       #
│   │       └── producer.go                        # Confluent-Kafka-Go producer wrapper
│   ├── deployment.yml                             # Kubernetes deployment
│   ├── docker-compose.yml                         # Docker deployment
│   ├── dockerfile                                 # To create Docker container
│   └── main.go                                    #
├── govideo                                        # Publishes video to web
│   ├── vendor                                     # Dependency files
│   │   ├── confluentkafkago                       #
│   │   │   └── consumer.go                        # Confluent-Kafka-Go consumer wrapper
│   │   └── mjpeg                                  #
│   │       └── stream.go                          # Streams JPEG pictures to web to form motion JPEG (MJPEG)
│   ├── deployment.yml                             # Kubernetes deployment
│   ├── docker-compose.yml                         # Docker deployment
│   ├── dockerfile                                 # To create Docker container
│   └── main.go                                    #
├── tfserving                                      # TensorFlow Serving API
│   ├── resnet                                     # Resnet TensorFlow model
│   │   └── 1538687457                             # TensorFlow model version number
│   │       ├── assets                             #
│   │       │   └── imagenetLabels.json            # Labels for imagenet classes
│   │       ├── variables                          # Variables of TensorFlow saved model
│   │       │   ├── variables.data-00000-of-00001  #
│   │       │   └── variables.index                #
│   │       └── saved_model.pb                     # TensorFlow saved model
│   ├── deployment.yml                             # Kubernetes deployment
│   ├── docker-compose.yml                         # Docker deployment
│   └── dockerfile                                 # To create Docker container
└── zookeeper                                      # Zookeeper and Kafka
    ├── deployment.yml                             # Kubernetes deployment
    └── docker-compose.yml                         # Docker deployment

Instructions

This project can be run either in Kubernetes cluster using the provided deployment.yml files or in Docker using the provided docker-compose.yml files. The pipeline consists of 5 subsystems, namely, (i) Zookeeper & Kafka, (ii) GoProducer, (iii) GoConsumer, (iv) TFServing, and (v) GoVideo. Commands to be executed to run each of the subsystem is provided below for both Kubernetes and Docker environments.

Steps Kubernetes Docker
Start Kubernetes cluster
  1. Sample command given here starts a minikube cluster
  2. Run the eval statement at the beginning of each shell. It enables the use of local Docker daemon to build Docker images within Kubernetes cluster.
$ minikube start
$ eval $(minikube docker-env)
Start Zookeeper and Kafka $ cd ~/Scalable-Deployment-Kubernetes/zookeeper
$ kubectl apply -f deployment.yml
$ cd ~/Scalable-Deployment-Kubernetes/zookeeper
$ docker-compose up
Start GoProducer
  1. Streams video from IP camera into Kafka queue
$ cd ~/Scalable-Deployment-Kubernetes/goproducer
$ docker build -t goproducer .
$ kubectl apply -f deployment.yml
$ cd ~/Scalable-Deployment-Kubernetes/goproducer
$ docker build -t goproducer .
$ docker-compose up
Start GoConsumer
  1. Classifies objects in each frame using Resnet machine learning model
$ cd ~/Scalable-Deployment-Kubernetes/goconsumer
$ docker build -t goconsumer .
$ kubectl apply -f deployment.yml
$ cd ~/Scalable-Deployment-Kubernetes/goconsumer
$ docker build -t goconsumer .
$ docker-compose up
Start TensorFlow Serving
  1. Answers REST queries to port 8501
$ cd ~/Scalable-Deployment-Kubernetes/tfserving
$ docker build -t tfserving .
$ kubectl apply -f deployment.yml
$ cd ~/Scalable-Deployment-Kubernetes/tfserving
$ docker build -t tfserving .
$ docker-compose up
Start GoVideo
  1. Broadcasts video to web
$ cd ~/Scalable-Deployment-Kubernetes/govideo
$ docker build -t govideo .
$ kubectl apply -f deployment.yml
$ cd ~/Scalable-Deployment-Kubernetes/govideo
$ docker build -t govideo .
$ docker-compose up


To terminate the microservices

  • in Kubernetes, execute kubectl delete -f deployment.yml in the respective folders of each susbsytem.
  • in Docker, simply close the terminal used to run the subsystem.

System Design

Each subsystem of the pipeline, is further explored in the following sections. For brevity, only deployment in Kubernetes (i.e., deployment.yml) is discussed below, since the discussions are similar for deployment in Docker environment using its counterpart docker-compose.yml files.

For beginners in Kubernetes, please see my post on guide to quickly mastering Kubernetes.

Zookeeper and Kafka

deployment.yml

  • Confluent Zookeeper and Kafka images are used.
  • kind of Zookeeper and Kafka are set to StatefulSet as they should have persistent states
  • A single Kafka broker with 5 ports (19091, 19092, 19093, 19094, 19095) is used, although this could be easily scaled up to multiple Kafka brokers.
  • IP address of the Kafka pod is obtained by declaring MY_POD_IP in the environment variable as follows:
      env
        - name: MY_POD_IP
          valueFrom:
              fieldRef:
                fieldPath: status.podIP
    
  • The variable MY_POD_IP can be referenced elsewhere in the yml file as $(MY_POD_IP). However, such variables must be declared before being used, for example in KAFKA_ADVERTISED_LISTENERS.
  • The variables KAFKA_REPLICA_FETCH_MAX_BYTES and KAFKA_MESSAGE_MAX_BYTES are set to 100MB to handle larger video frame sizes.
  • To save hard disk memory, we opt to delete logs older than 1 minute by setting KAFKA_LOG_CLEANUP_POLICY and KAFKA_LOG_RETENTION_MIUTES.
  • Here, KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR is set to 1 as there is only one Kafka broker.
  • Service’s are used to communicate between pods in Kubernetes

GoProducer

deployment.yml

  • Environment variable VIDEOLINK states the IP address of the IP camera
  • Environment variable FRAMEINTERVAL determines the frames-per-second (fps) of the video. For example, 42ms translates into 24 fps.

main.go

  • Streams video from VIDEOLINK at FRAMEINTERVAL interval and writes them into Kafka topic TOPICNAME.

confluentkafkago.NewProducer()

  • Provides a high level wrapper code for creation of Confluent Kafka producers.
  • message.max.bytes is set to 100MB to support large sized messages such as images with high quality or large size

GoConsumer

deployment.yml

  • MODELURLS and LABELURLS contain addresses to TensorFlowServing saved model and class labels, respectively.
  • Multiple machine learning models can be applied to the same video.
  • Here, we have simply duplicated the same machine learning model, Resnet, twice to mimic use of different machine learning models.

main.go

  • Consumes video from Kafka topic TOPICNAMEIN, and handles valid messages using main.message() function.
  • Consists of three goroutines, namely, (i) to consume messages from TOPICNAMEIN, (ii) to query TensorFlow model, and (ii) to rewrite processed frames into TOPICNAMEOUT.

main.message()

  • Frame is extracted from received Kafka message
  • Frame is fed to each of the TensorFlow Serving saved models
  • The class predictions from each of the TensorFlow Serving is retrieved and imprinted onto the video.

models.*imagenet.Predict()

  • Encodes gocv.Mat type to JPEG type, packs the images into a JSON structure and makes a POST request to the TensorFlow Serving pod at port 8501.
  • Returns the predicted class
  • In the event the goroutine panicks, it is recovered and restarted via the defer function:
      defer func() {
        if r := recover(); r != nil {
          log.Println("models.*imagenet.Predict():PANICKED AND RESTARTING")
          log.Println("Panic:", r)
          go imn.Predict()
        }
      }()
    

confluentkafkago.NewConsumer()

  • Provides a high level wrapper code for creation of Confluent Kafka consumers.

confluentkafkago.LatestOffset()

  • LatestOffset() resets consumer offset to the latest message in the topic-partition queue, if the difference between the low and high watermark is more than the desired diff.
  • Setting diff=0, Kafka consumer will always consume the latest message.
      func LatestOffset(c *kafka.Consumer, diff int) error {
    
        // Record the current topic-partition assignments
        tpSlice, err := c.Assignment()
        if err != nil {
          return err
        }
    
        //Obtain the last message offset for all topic-partition
        for index, tp := range tpSlice {
          low, high, err := c.QueryWatermarkOffsets(*(tp.Topic), tp.Partition, 100)
          if err != nil {
            return err
          }
          if high-low < int64(diff) {
            return errors.New("Offset difference between Low and High is smaller than diff")
          }
          tpSlice[index].Offset = kafka.Offset(high)
        }
    
        //Consume the last message in topic-partition
        err = c.Assign(tpSlice)
        if err != nil {
          return err
        }
        return nil
      }
    

assets/imagenetLabels.json

  • Contains class labels in JSON format from ImageNet competition.

TFServing

dockerfile

  • tensorflow/serving is used as the base image
  • Our TensorFlow saved model is copied from ./resnet folder into the image’s /model/tfModel folder
  • Since a TensorFlow Serving can hold multiple models simultaneously, the desired model to be used is specified by setting the environment variable MODEL_NAME to that of the desired model’s folder name in the image. In our case, desired model’s folder name would be tfmodel.

deployment.yml

  • The deployment.yml runs two tfserving replicas in a load balancing manner to serve REST requests from clients at port 8501 and gRPC requests at port 8500.

resnet

  • Please refer to the official guide on how to build and use a TensorFlow Serving model.
  • Running the saved_model_cli command
    $ saved_model_cli show --dir /path/to/repository/machine-learning-deployment/tfserving/resnet/1538687457 --all
    

    the TensorFlow saved model signature is obtained as shown below.

    tfsimagenet

GoVideo

deployment.yml

  • Runs one instance of govideo and a govideo-service service.
  • To view the video output:
    • from outside Kubernetes, go to <Kubernetes Cluster IP>:<NODEPORT>. For example, if you are running Minikube at 192.168.99.100, then go to 192.168.99.100:30163.
    • from outside Docker, go to <Docker machine IP>:<NODEPORT>. For example, if you are running Docker natively, go to 127.0.0.1:30163. If you are running Docker in a virtual machine at 192.168.99.100, then go to 192.168.99.100:30163.

main.go

  • Uses a goroutine to read processed video frames from Kafka topic TOPICNAME and calls mjpeg.*Stream.UpdateJPEG() to form MJPEG
  • Runs a web server listening at <POD IP>:<DISPLAYPORT>, which is mapped to external node (i.e., host machine) at <Machine IP>:<NODEPORT> by the govideo-service service.

mjpeg.*Stream.UpdateJPEG()

  • Updates JPEG images to form MJPEG, which is then copied into the channels corresponding to each connected web client.

mjpeg.*Stream.ServeHTTP()

  • Provides a ServeHTTP() handle pattern to broadcast MJPEG to each connected web client at a rate of 1/FRAMEINTERVAL.
  • A sample video output is shown below. It contains the predicted ImageNet object classes.

    OutputVideo

Multiple model deployment

An extended version of this machine learning deployment is available at this repository. Here, two machine learning models, namely, emotion recognition and object classification simultaneously process the input video.

Updated:

Leave a comment