Real Time Streaming Visualization

Project

Problem

In this project we will develop a web based visualization tool for real time data streaming application. For the purpose of explanation, we shall assume that we are streaming data from a sensor, on which we perform some computation and subsequently classify the sensor data at each time point.

Our example will demonstrate the following:

  • Running a Bokeh server
  • Simultaneously running multiple python threads for other tasks
  • Auto start of threads by simply starting the Bokeh server
  • Interactions among: browser \(\longleftrightarrow\) server \(\longleftrightarrow\) pyhton threads

The following will be used:

  • Python 3.6
  • Multithreaded Python coding
  • Bokeh plots
  • Bokeh server
  • PyCharm

It is recommended to use PyCharm as the IDE since it provides version control capability and project file structure management capability.

The expected end product of this project is shown below: pic1

Repository

The repository consists of the following:

  • Python and Bokeh scripts - complete functioning sample code
  • .idea files - project setting files derived from PyCharm IDE

The project structure is as follows:

Real-Time-Streaming-Visualization   -- main folder
└── streaming                       -- python package, this is set as 'Source Root' in PyCharm
    ├── __init__.py                 -- this file indicates that 'streaming' is a python package
    ├── main.py                     -- script with main function
    ├── Sensor.py                   -- multithreaded python script to obtain sensor data 
    └── Visual.py                   -- Bokeh code for web dashboard

Solution

It is beneficial for beginners to read through the Bokeh tutorial and guide to gain a better understanding of the visualization tool.

Begin with installation of Bokeh via the command: conda install bokeh (if you are using Anaconda python distribution, which is desirable) or pip install bokeh.

The code will be explained in a top to bottom manner. Hence, even if some parts are not clear at first, it will become clear towards the end when considering the various code as a whole system.

Firstly, we take a look at the main.py file.

from Visual import *
from Sensor import *

def threads(callbackFunc, running):
    # Set multiple threads
    sensor = Sensor(callbackFunc=callbackFunc, running=running) # Instantiate the Sensor thread
    # Start threads 
    sensor.start() # Run the thread to start collecting data

def main():
    #Set global flag
    event = threading.Event() # Create an event to communicate between threads
    event.set() # Set the event to True

    webVisual = Visual(callbackFunc=threads, running=event) # Instantiate a Bokeh web document
    threads(callbackFunc=webVisual, running=event) # Call Sensor thread

# Run command:
# bokeh serve --show streaming
main()

Here, we do the following:

  1. Create a threading event to be used as Flag for communication between threads in python
  2. Instantiate a Bokeh web document as webVisual, which is defined in Visual.py file
  3. Instantiate our Sensor thread and run it
  4. For callback and interactive user experience:
    • Feed the threads function into webVisual object to restart the thread from browser interface
    • Feed the Bokeh document webVisual into our thread to enable the Sensor thread to inform the browser whenever new sensor data is available for plotting
    • Feed the threading event Flag to both the Sensor thread and webVisual as a common Flag

To run this program, open a command prompt and change your present directory to that of the main folder, for example C:\Real-Time-Streaming-Visualization. Then issue the command bokeh serve --show streaming which will publish Bokeh document webVisual to a web browser at http://localhost:5006/streaming.

Secondly, we take a look at our Sensor thread which is defined in a threading.Thread class in Sensor file.

import time
import threading
import random
from functools import partial

class Sensor(threading.Thread):
    def __init__(self, callbackFunc, running):
        threading.Thread.__init__(self) # Initialize the threading superclass
        self.val = 0 # Set default sensor data to be zero
        self.running = running # Store the current state of the Flag
        self.callbackFunc = callbackFunc # Store the callback function

    def run(self):
        while self.running.is_set(): # Continue grabbing data from sensor while Flag is set
            time.sleep(0.2)  # Time to sleep in seconds, emulating some sensor process taking time
            self.val = random.randint(0, 10) # Generate random integers to emulate data from sensor
            self.callbackFunc.doc.add_next_tick_callback(partial(self.callbackFunc.update, self.val)) # Call Bokeh webVisual to inform that new data is available
        print("Sensor thread killed") # Print to indicate that the thread has ended

Here, the following is performed:

  1. When sensor.start() is executed from main.py file, the run() function in Sensor thread will be executed.
  2. Sensor thread runs continuously as long as the Flag is set
  3. Random numbers are generated to emulate sensor data
  4. Once new sensor data is available, the webVisual object is called via Bokeh’s callback utility add_next_tick_callback
    • In the callback, specifically invoke the update function of the Visual class
    • Feed the necessary argument (i.e., new sensor data which is available in self.val) to the update function by using a partial function

Refer to Bokeh guide on threads for more information on updating from threads. All actions from threads that update Bokeh document state must go through a add_next_tick_callback.

Finally, we see the python code to generate the Bokeh plots, Bokeh web document, and Bokeh Server which serves us the web based visualization.

from bokeh.plotting import figure
from bokeh.models import LinearAxis, Range1d, HoverTool, ColumnDataSource, Legend
from bokeh.layouts import gridplot, column, row
from bokeh.models.widgets import CheckboxGroup, Div
from bokeh.io import curdoc
from tornado import gen

class Visual:
    def __init__(self, callbackFunc, running):
        self.text1 = Div(text="""<h1 style="color:blue">Visualization of Real Time Data Streaming</h1>""", width=900, height=50) # Text to be displayed at the top of the webpage
        self.running = running # Store the current state of the Flag
        self.callbackFunc = callbackFunc # Store the callback function
        self.hover = HoverTool(  # To show tooltip when the mouse hovers over the plot data        
                tooltips=[
                    ("index", "$index"), # Show index of plot data points
                    ("(x,y)", "(@x, $y)") # Show x and y coordinates of the plot data points
                ]
            )
        self.tools = "pan,box_zoom,wheel_zoom,reset" # Set pan, zoom, etc., options for the plot
        self.plot_options = dict(plot_width=800, plot_height=200, tools=[self.hover, self.tools]) # Set plot width, height, and other plot options
        self.updateValue = True # Internal state for updating of plots
        self.source, self.pAll = self.definePlot() # Define various plots. Return handles for data source (self.source) and combined plot (self.pAll)
        self.doc = curdoc() # Save curdoc() to make sure all threads see the same document. curdoc() refers to the Bokeh web document
        self.layout() # Set the checkboxes and overall layout of the webpage 
        self.prev_y1 = 0 

    def definePlot(self):
        # Define plot 1 to plot raw sensor data
        p1 = figure(**self.plot_options, title="Sensor Data")
        p1.yaxis.axis_label = "Sensor Value"
        # Define plot 2 to plot (1) processed sensor data and (2) classification result at each time point
        p2 = figure(**self.plot_options, x_range=p1.x_range, title="Computed Value") # Link x-axis of first and second graph
        p2.xaxis.axis_label = "Time (Discrete)"
        p2.yaxis.axis_label = "Computed Value"
        p2.extra_y_ranges = {"class": Range1d(start=-1, end=2)} # Add a secondary y-axis
        p2.add_layout(LinearAxis(y_range_name="class", axis_label="Classification"), 'right') # Name and place the secondary y-axis on the right vertical edge of the graph
        # Define source data for all plots
        source = ColumnDataSource(data=dict(x=[0], y1=[0], y2=[0], y3=[0]))
        # Define graphs for each plot
        r1 = p1.line(x='x', y='y1', source=source, color="firebrick", line_width=2) # Line plot for sensor raw data
        r1a = p1.circle(x='x', y='y1', source=source, color="firebrick", fill_color="white", size=8)  # Circles to indicate data points in first graph
        r2 = p2.line(x='x', y='y2', source=source, color="indigo", line_width=2) # Line plot for computed values
        r2a = p2.circle(x='x', y='y2', source=source, color="indigo", fill_color="white", size=8) # Circles to indicate data points in second graph
        r3 = p2.step(x='x', y='y3', source=source, color="green", line_width=2, y_range_name="class") # Overlay step plot for binary classes in the second graph
        # Place legends outside the plot area for each data source
        legend = Legend(items=[("Sensor Data", [r1, r1a])], location=(5, 30))
        p1.add_layout(legend, 'right')
        p1.legend.click_policy = "hide" # Plot line may be hidden by clicking the legend marker 
        legend = Legend(items=[("Computed Value", [r2, r2a]), ("Classification", [r3])], location=(5, 30))
        p2.add_layout(legend, 'right')
        p2.legend.click_policy = "hide"  # Plot line may be hidden by clicking the legend marker
        # Combine all plots into a gridplot for better vertical alignment
        pAll = gridplot([[p1], [p2]])

        return source, pAll # Return handles to data source and gridplot

    @gen.coroutine
    def update(self, val):
        if self.updateValue: # Update the plots only if the 'self.updateValue' is True
            # Compute new values for each column data
            newx = self.source.data['x'][-1] + 1 # Increment the time step on the x-axis of the graphs
            newy1 = val # Assign raw sensor data to be displayed in the first graph
            newy2 = (val + self.prev_y1)/2 # Perform computation (i.e., moving average) on the sensor data and plot in the second graph
            self.prev_y1 = newy1
            newy3 = newy2 > 5 # Classify the signal (i.e., binary calssification) at each time point and plot the results in the second graph
            # Construct the new values for all columns, and pass to stream
            new_data = dict(x=[newx], y1=[newy1], y2=[newy2], y3=[newy3])
            self.source.stream(new_data, rollover=20) # Feed new data to the graphs and set the rollover period to be xx samples

    def checkbox1Handler(self, attr, old, new):
        if 0 in list(new):  # Verify if the first checkbox is ticked currently
            if 0 not in list(old): # Perform actions if the first checkbox was not ticked previously, i.e., it changed state recently 
                self.running.set() # Set the Flag
                self.callbackFunc(self, self.running) # Restart the Sensor thread
        else:
            self.running.clear()  # Kill the Sensor thread by clearing the Flag
        if 1 in list(new):  # Verify if the second checkbox is ticked
            self.updateValue = True # Set internal value to continue updating the graphs
        else:
            self.updateValue = False # Set internal value to stop updating the graphs

    def layout(self):
        # Build interactive user interface
        checkbox1 = CheckboxGroup(labels=["Start/Stop Sensor Thread", "Start/Stop Plotting"], active=[0, 1]) # Create checkboxes
        checkbox1.on_change('active', self.checkbox1Handler) # Specify the action to be performed upon change in checkboxes' values 
        # Build presentation layout
        layout = column(self.text1, row(checkbox1, self.pAll)) # Place the text at the top, followed by checkboxes and graphs in a row below 
        self.doc.title = "Real Time Sensor Data Streaming" # Name of internet browser tab
        self.doc.add_root(layout) # Add the layout to the web document

The Bokeh web document contains the various plots, defines the source of data for the plots, user interfaces such as checkboxes, callback function to trigger the corresponding action when the checkboxes’ state changes, and web page layout.

The key functions of each method defined in the Visual class is described below.

  1. definePlot()
    • Create two graphs to plot values against time
    • x-axis of the second graph is linked to the first graph via the command x_range=p1.x_range
    • First graph plots raw sensor data
    • Second graph plots processed sensor data (in the left y-axis) and classification results (in the right y-axis)
    • Places the two graphs into a gridplot for vertical alignment
  2. update()
    • This function will be called by the Sensor thread whenever new data is available for adding into the time plot
    • New data points for each graph is structured into a dictionary
    • The graphs are updated by feeding the dictionary into a stream as given by self.source.stream(new_data, rollover=20) with a rollover period
    • The plots will be only be updated if the self.updateValue is True
  3. checkbox1Handler()
    • This function will be invoked whenever there is a change to the checkboxes’s state
    • If the first checkbox transitions from ‘unticked’ to ‘ticked’, then the Sensor thread will be restarted
    • If the first checkbox is unticked, then the Sensor thread will be terminated by clearing the Flag
    • If the second checkbox is ticked, the Bokeh server will enable updating of the graphs
  4. layout()
    • Define two checkboxes and their handler/callback function, namely checkbox1Handler()
    • Position the text, checkboxes, and graphs into a nice layout
    • Add the layout to the web document to be served to the browser by the Bokeh server

I hope this example would lead you to explore more advanced real-time data streaming visualizations for various applications, including machine learning and big data applications.

Updated:

Leave a comment