RTSP Video, Kafka, and Microservices
Info
A real time streaming protocol (RTSP) video is streamed from a website using OpenCV into a Kafka topic and consumed by a signal processing application. This project serves to highlight and demonstrate various key data engineering concepts. The data pipeline is as follows:
GoProducerRTSP
module: An RTSP video is streamed from a third party website using dockerized GoCV (a golang wrapper of openCV) code written in Golang. The video is enqueued in a dockerized Kafka topic. A multistage image build is performed to minimize runtime image size. RTSP is prevalent in security cameras and commercial camera systems.- Zookeeper and Kafka modules: Zookeeper and Kafka container instances are created from Confluent docker images. Besides message passing, Kafka is used for inter-language communication between Golang code and Python code, in this project.
PyConsumerRTSP
module: The video is consumed from the Kafka topic by a Python code running in the host machine (i.e., outside of Docker). The fetched video frames are displayed using OpenCV. Some simple computation is performed on each video frame and results are printed to screen. This simple signal processing code serves as a placeholder for the real signal processing code later.PyConsumerRTSP2
module: This is a duplicate ofPyConsumerRTSP
module. The consumers ofPyConsumerRTSP
andPyConsumerRTSP2
belong to two different consumer groups but consume from the same Kafka topic. Running them simultaneously, demonstrates the scalability of the data pipeline.
Learning Outcome
The repository contains the following:
- Docker-compose file for Zookeeper and Kafka services
- Go code
- Dockerized Zookeeper and Kafka images
- Dockerized Kafka producer with GoCV (openCV) library
- Python code
- Kafka consumer for RTSP video
- OpenCV and dummy signal processing code
At the end of this project, we should be able to:
- manage Go dependencies using
dep
- stream RTSP video
- use dockerized Zookeeper and Kafka
- dockerize Golang code
- perform internal/external Docker networking
- use dockerized GoCV (i.e., Golang client for OpenCV)
- use of Sarama library (Golang client for Kafka) and Kafka-Python library (Python client for Kafka)
- perform docker multistage build to reduce image size
- use Kafka for inter-language communication (between Golang and Python)
- building scalable and fault tolerant data pipeline with multiple producers and consumers
Project Structure
The project structure is as follows:
DataPipeline # Main folder
├── goproducerrtsp # To grab frames from RTSP video website and to enqueue in Kafka
│ ├── vendor # Dependency vendor files generated using 'dep ensure'
│ │ └── ... #
│ ├── .env # Environment variables
│ ├── Docker-compose.yml # To instantiate Docker container
│ ├── Dockerfile # To build Docker image
│ ├── Gopkg.lock # Dependency version file generated using 'dep ensure'
│ ├── Gopkg.toml # Dependency version file generated using 'dep ensure'
│ └── main.go # Go code producing to Kafka
├── pythonconsumerrtsp # To consume from Kafka in Python, outside Docker environment
│ ├── dataprocessing # Template folder for signal processing
│ │ ├── __init__.py # Package file
│ │ └── alg.py # Dummy signal processing object: Computes average pixel value
│ ├── kafkapc_python # Kafka producer and consumer using kafka-python library
│ │ └── __init__.py #
│ ├── message # Kafka message handling function
│ │ └── __init__.py #
│ ├── .env # Environment variables
│ ├── Docker-compose.yml # To instantiate Docker container
│ ├── Dockerfile # To build Docker image
│ ├── main.py # Python code consuming from Kafka
│ └── requirements.txt # Imported libraries in the python code
├── pythonconsumerrtsp2 # Duplicate of `pythonconsumerrtsp` to illustrate code scalability
│ ├── ... #
│ . #
│ . #
│ . #
│ └── ... #
└── zookeeper # Zookeeper and Kafka
└── Docker-compose.yml # To instantiate Docker container for Zookeeper and Kafka
System Design
Further description of the system design will be provided if requested by readers.
The complete system design and data flow of this project is illustrated by the following image.
Each component of the system will be explored in detail in the following sections.
Zookeeper and Kafka
Firstly, the Zookeeper and Kafka services are started as containers using docker images from Confluent. Below is the Docker-compose.yml
file.
version: '3'
services:
#Zookeeper
zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
restart: always #Ensures that intermittent failures in the Docker environment do not result in unnecessary failures of the service.
networks:
- dockerNet
ports:
- 2181:2181 #Zookeeper port - access port for external communication
environment:
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
#Kafka
kafka:
image: confluentinc/cp-kafka
container_name: kafka
restart: always #Ensures that intermittent failures in the Docker environment do not result in unnecessary failures of the service.
depends_on:
- zookeeper
networks:
- dockerNet
ports:
- 9092:9092 #Kafka broker - access port for external communication
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT_HOST://localhost:9092, PLAINTEXT://kafka:29092 #For native Docker (e.g., in Windows 10)
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT_HOST://192.168.99.100:9092, PLAINTEXT://kafka:29092 #For Docker Tool (e.g., in Windows 7)
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT, PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_JMX_PORT=1099
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
networks:
dockerNet:
driver: bridge
Containers are connected via an internal Docker network named dockerNet
to facilitate internal communication among containers. Containers can access each other by their container_name
.
Zookeeper port number is 2181
and it is published to localhost making it accessible to programs from outside the Docker environment.
The KAFKA_ADVERTISED_LISTENERS
variable is set to 192.168.99.100:9092
and kafka:29092
. This makes Kafka accessible from inside the Docker through kafka:29092
and also from outside the Docker through 192.168.99.100:9092
by advertising its location on the host machine.
Remember to use your Docker machine IP (e.g., 192.168.99.100
as shown in above code) if you are using Docker Tool (or a VM). Otherwise, if you are using native Docker, please replace the IP addresses with localhost
as shown commented in the above code.
Issue the following Docker commands in the working directory of the Docker-compose.yml file:
- To run Zookeeper and Kafka containers
docker-compose up
- To tear down all containers and stored volume at the end of operation
docker-compose down -v
docker system prune
docker volume prune
docker network prune
GoProducerRTSP
The Kafka producer code, in Golang, to stream RTSP video into Kafka topic timeseries_1
is shown below. Sarama library is used as the Golang client for Kafka producer. A further wrapper for Golang producer (and consumer) built on top of Sarama and wvanbergen libraries is provided for ease of use in my kafkapc package.
package main
import (
"encoding/json"
"fmt"
"image"
"io"
"log"
"os"
"time"
"github.com/Shopify/sarama"
"github.com/adaickalavan/kafkapc"
"gocv.io/x/gocv"
)
//Hooks that may be overridden for testing
var inputReader io.Reader = os.Stdin
var outputWriter io.Writer = os.Stdout
// Instantiate a producer
var producer sarama.AsyncProducer
func main() {
//Sarama logger
sarama.Logger = log.New(outputWriter, "[saramaLog]", log.Ltime)
//Create a Kafka producer
var brokers = []string{os.Getenv("KAFKAPORT")}
var err error
producer, err = kafkapc.CreateKafkaProducer(brokers)
if err != nil {
panic("Failed to connect to Kafka. Error: " + err.Error())
}
//Close producer to flush(i.e., push) all batched messages into Kafka queue
defer func() { producer.Close() }()
// Capture video
webcam, err := gocv.OpenVideoCapture(os.Getenv("RTSPLINK"))
if err != nil {
panic("Error in opening webcam: " + err.Error())
}
// Stream images from RTSP to Kafka message queue
frame := gocv.NewMat()
for {
if !webcam.Read(&frame) {
continue
}
// Type assert frame into RGBA image
imgInterface, err := frame.ToImage()
if err != nil {
panic(err.Error())
}
img, ok := imgInterface.(*image.RGBA)
if !ok {
panic("Type assertion of pic (type image.Image interface) to type image.RGBA failed")
}
//Form the struct to be sent to Kafka message queue
doc := Result{
Pix: img.Pix,
Channels: frame.Channels(),
Rows: frame.Rows(),
Cols: frame.Cols(),
}
//Prepare message to be sent to Kafka
docBytes, err := json.Marshal(doc)
if err != nil {
log.Fatal("Json marshalling error. Error:", err.Error())
}
msg := &sarama.ProducerMessage{
Topic: os.Getenv("TOPICNAME"),
Value: sarama.ByteEncoder(docBytes),
Timestamp: time.Now(),
}
//Send message into Kafka queue
producer.Input() <- msg
//Print time of receiving each image to show the code is running
fmt.Fprintf(outputWriter, "---->>>> %v\n", time.Now())
}
}
//Result represents the Kafka queue message format
type Result struct {
Pix []byte `json:"pix"`
Channels int `json:"channels"`
Rows int `json:"rows"`
Cols int `json:"cols"`
}
Video frames captured by GoCV are of type gocv.Mat
struct. It is then converted to type image.Image
interface and type asserted into an RGBA image. The pixel values along with other image information are written into a struct and sent to Kafka topic.
Dep is used for Golang dependency management. To initialize, issue dep init
command in the project’s main working directory. Commands dep ensure
ensures the project is in sync and dep ensure -update
updates all the dependencies.
Once vendor
, Gopkg.lock
, and Gopkg.toml
folders have been created by dep
commands, a docker image is created by executing docker build -t goproducerrtsp .
on the Dockerfile below.
FROM denismakogon/gocv-alpine:3.4.2-buildstage as build-stage
#Copy the local package files (from development computer) to the container's workspace (docker image)
COPY . /go/src/app
# Build the app command inside the container
RUN go install app
FROM denismakogon/gocv-alpine:3.4.2-runtime
#Author label
LABEL Author Adaickalavan Meiyappan
#Copy executable binary file
COPY --from=build-stage /go/bin/app /go/bin/app
#Run executable when container starts
ENTRYPOINT ["/go/bin/app"]
The denismakogon/gocv-alpine:3.4.2-buildstage
base image consists of Alpine 3.7, FFMPEG 4.0, Golang 1.10, OpenCV 3.4.2, and OpenCV 3.4.2 contrib packages required by GoCV. A multistage build is performed to minimize runtime image size. Hence, the compiled binary is copied from the build-stage into a basic runtime base image denismakogon/gocv-alpine:3.4.2-runtime
consisting of Alpine 3.7.
The goproducerrtsp
service is started by running docker-compose up
on the following Docker-compose.yml file. To enable internal Docker communication between goproducerrtsp
and kafka
containers, set the KAFKAPORT
address to kafka:29092
.
version: '3'
services:
#Producer to obtain data from RTSP and write into Kafka queue
goproducerrtsp:
image: goproducerrtsp
container_name: goproducerrtsp
environment:
- TOPICNAME=timeseries_1
- KAFKAPORT=kafka:29092
- RTSPLINK=rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov
networks:
- zookeeper_dockerNet
# To ensure that the containers in different docker-compose files communicate with each other, we place them on the same network. The complete network name is 'zookeeper_dockerNet'. It is derived by joining the name of the folder from which the network originates (i.e., zookeeper) and the name of the network (i.e., dockerNet).
privileged: true
networks:
zookeeper_dockerNet:
external: true #Label the other docker-compose network as an external network to the current docker-compose file
PyConsumerRTSP
The video frames are retrieved by a Kafka consumer in Python using the kafka-python library. A further wrapper for Python consumer (and producer) built on top of kafka-python library is provided for ease of use in my kafkapc_python package.
from dotenv import load_dotenv
import kafkapc_python as pc
import os
import cv2
import message
import dataprocessing.alg as alg
def main():
# Create consumer
consumer = pc.Consumer(
os.getenv('TOPICNAME'),
os.getenv('KAFKAPORT'),
os.getenv('CONSUMERGROUP')
)
# Prepare openCV window
print(cv2.__version__)
windowName = "RTSPvideo1"
cv2.namedWindow(windowName)
cv2.resizeWindow(windowName, 240, 160)
#Instantiate a signal processing model
model = alg.Model()
# Start consuming video
for m in consumer:
#Read message contents
val = m.value
print("Time:",m.timestamp,", Topic:",m.topic)
#Message handler
img = message.handler(val)
#Show image
cv2.imshow(windowName, img)
cv2.waitKey(1)
#Process the message
model.run(img)
consumer.close()
return
if __name__ == "__main__":
# Load environment variable
load_dotenv(override=True)
# Call main function
main()
Environment variables are saved in a .env
file, as shown below. The python-dotenv library is used to load the .env
file into Python.
TOPICNAME=timeseries_1
#For native Docker (e.g., in Windows 10)
# KAFKAPORT=localhost:9092
#For Docker Tool (e.g., in Windows 7)
KAFKAPORT=192.168.99.100:9092
CONSUMERGROUP=consumerGroup_1
The Python code is to be run on the host machine, outside of the Docker environment. Hence, for the Python code to communicate with the kafka
container living inside the Docker environment, set KAFKAPORT=192.168.99.100:9092
.
Remember to use your Docker machine IP (e.g., 192.168.99.100
as shown in above code) if you are using Docker Tool (or a VM). Otherwise, if you are using native Docker, please replace the IP addresses with localhost
as shown commented in the above code.
Slice of bytes from Golang, representing image pixel values, is stored as base-64 string in Kafka topic. The message.handler()
function converts base-64 string into appropriately shaped 3-channel RGB numpy matrix. Converted numpy matrix is then displayed by OpenCV-python library.
The dataprocessing.alg
module provides a template to perform any further signal processing steps.
Python project dependencies can be easily managed in a two step process. First, execute pipreqs
command to generate requirements.txt
file containing all the required library imports.
pipreqs [options] <path/to/Python/project/folder>
[options]
--force : to overwrite existing file
Second, install the dependencies via pip install -r requirements.txt
.
Although the Python code in this project is meant to be run on the host machine, it may be containerized. Sample Dockerfile to build the Python image and sample Docker-compose.yml to instantiate the Python container are provided in the repository.
PyConsumerRTSP2
The pyconsumerrtsp2
Python module is simply a duplicate of pyconsumerrtsp
module. Running both Python modules simultaneously illustrates the scalability and resilience of the Kafka based system. The pyconsumerrtsp2
module similarly runs on the host machine, outside the Docker environment.
From the .env
file, we see that pyconsumerrtsp2
subscribes to the same Kafka topic as pyconsumerrtsp
but is assigned to a different consumergroup, namely, consumerGroup_2
.
TOPICNAME=timeseries_1
#For native Docker (e.g., in Windows 10)
# KAFKAPORT=localhost:9092
#For Docker Tool (e.g., in Windows 7)
KAFKAPORT=192.168.99.100:9092
CONSUMERGROUP=consumerGroup_2
Although both Python consumers read from the same underlying message queue timeseries_1
, they work asynchronously by processing data at different speeds and at different times. This enables horizontal scaling of multiple processes on the same data.
Leave a comment