Recently we looked at a project to predict train delays on the network. It's a complex problem and we addressed a small part of it. It was a real collaboration effort between real-time engineering and integration and data flow with predictions. My data science teams prefer to use either #ApacheSpark or #Python (sci-kit learn or #keras normally).
In this we ruled out #Spark (at least #Databricks) as we originally wanted to get the entire solution under a budget of £100. That was wishful thinking but we did inevitably manage to get the solution under a budget of £200! Just to be clear the same solution in Databricks (which would be more manageable) would cost $690/month on PAYG and Azure Machine Learning Service $330. The former would give us resilience the latter is based on a single node in AKS like in the solution I'm presenting.
To get the best out of this pattern it's worth understanding a few bits of terminology and platform service in Microsoft Azure.
lambda architecture
Azure Stream Analytics
sci-kit learn
pandas
Azure SignalR Hubs
Azure Event Hubs
GeoJSON
Azure Functions
Azure Maps
For me it's important to understand that Cloud vendors make your life easier through platform services but if you already have the skills and capability in-house to replicate that are willing to bundle that into the total cost of ownership then you can keep the cash cost to the cloud provider down. Inevitably you pay the same or more as less use of the platform entails more development work and a greater reliance on people to build in things such as flow and resilience.
The platform involved the following facets:
Convert data from a STOMP protocol to Azure Event Hubs and guarantee minimisation of loss of messaging
Join real-time data with reference data to enrich the messages
Persist data in long term redundant cloud storage
Make a prediction on the lateness of a train
Wrap up messages into GeoJson with features describing things you want to show on the map
Send to browser and update map and tables
This sounds like a lot of work and it is.
Normally my team would look at a simple model which can be built using mllib in Spark and running Azure Databricks. Ideally we would look to building a streaming receiver for a #STOMP client and probably scale receiver partitions with something like this vertx library in #Scala by STOMP subscriptions.
In this instance and because price was a factor we decided that we would create a Linux VM which would be running the and subscribing to STOMP messages and then send data back out as an #Event #Hub message.
Enter platform services. I've always maintained the Azure Stream Analytics is one of the best value for money platform services on Azure. If it's used properly it has so much capability for a an entry level price. In our case we're currently streaming, pivoting and transforming and joining with static reference data about 40 messages/second and it's still only running at 30% utilisation.
The #Stream #Analytics job outputs in batches of messages on a Tumbling Window of 5 seconds as a #JSON array on the Event Hub.
This is then received by an Azure Function which will package the data in a #GeoJson format and publish to a #SignalR #Hub in #Azure. Before doing this though it has to make a lateness prediction. This is where the fun starts.
The team built a model using sci-kit learn which was trained on quite a bit of history. The engineered features are known at the time that we need to score the output so on the fly decided that rather than using a platform services based approach I would manage models myself, train, score and host the model without the need for real-time scoring using Spark streaming or #AzureMLv2 which is where the real expense of machine learning comes.
The pattern was simple. Create a REST Api to wrap up the Python model and deliver this back so that the Azure Function can invoke this on every message to predict lateness.
In order to expose the machine learning model to the Azure Function we used the Flask framework in Python which had a very easy quickstart.
To make this robust we hosted this using the #gunicorn web server and exposed this through a private endpoint. Here is a sample SSH script from our release pipeline which shows that every time we update the model we shut down gunicorn and then start it again with 3 workers ready to receive requests. This takes less than a second but given the messaging doesn't stop and the output is batched on a tumbling window of 5 seconds, there is no message loss and minimal latency.
pid=`ps ax | grep gunicorn | grep $Port | awk '{split($0,a," "); print a[1]}' | head -n 1`
if [ -z "$pid" ]; then
echo "no gunicorn daemon on port $Port"
else
kill $pid
echo "killed gunicorn daemon on port $Port"
fi
gunicorn lateness:app --name lateness --workers 3 --user=azurecoder --bind=0.0.0.0:8000 --daemon
A key feature of this to minimise the loss of message from the STOMP subscription is to ensure that all messages are accumulated as much as possible given that we don't have a highly resilient solution.
In order to do this we took advantage of the #crontab and wrote a sentinel. The sentinel checks to see whether the software which subscribes to STOMP messages is still running and if not it starts it again. Given the #crontab granularity of scheduling every minute you need to be quite innovative about this solution which means running for a minute a script on a timer which checks every 5 seconds or so within the single #crontab schedule. This worked like a treat as well!
Flask works like a treat and can scale in terms of worker processes with gunicorn so even though we only pay for a single machine we mitigate as much as can unless the hardware fails or the machine reboots. Here is an example of what the entry point for the scoring code looks like. It's worth understanding that many frameworks such as AzureML abstract this from you but it's really not complicated.
from flask import Flask, request, jsonify
import os
import json
import os
import pickle
import datetime
import pandas as pd
from azure.storage.blob import *
from sklearn import preprocessing
app = Flask(__name__)
app.config['DEBUG'] = True
@app.route('/score', methods=['POST'])
def lateness_score():
features = request.json
output = {}
preds = pd.DataFrame.from_dict(features, orient="columns")
Simply put this code constructs a route and allows you to post JSON data containing features. You can then use a machine learning model by loading these into pandas operating sci-kit learn over the features and creating an output which can be sent to the caller in JSON.
return jsonify(output)
In this case output is a dictionary of name-value pairs that is sent to the SignalR service by push.
Last thing you need to think about is to ensure that you load the model every time the gunicorn worker is started. Again not an arduous task. Just load the trained pickle file into a global variable and then you should be able to use it on every request without a reload. As some models may be in the GB you only want to do this once!
f = open("~/model.pkl",'rb') pickle_model = pickle.load(f)
Thanks to @andyelastacloud for the wonderful looking UI which is depicted below and consumes the GeoJSON from SignalR.
Comments