AZURE FOR BEGINNERS

Simple Streaming Analytics Architecture with Azure

Build a basic IoT data stream processing and notification engine using Microsoft’s cloud offering and Telegram.

Krzysztof Radecki
DAC.digital Technology Blog
12 min readSep 17, 2019

--

We know — the Cloud can be intimidating and a steep learning curve might discourage people from trying it out. Dozens of services with seemingly the same purpose and cryptic names. So why don’t we start with a small bite instead of trying to understand the big picture?

Let me say it upfront — what we’re building here is not optimized for production use and you can (and should) do it way better! We aim to gently introduce you to cloud offering for IoT processing. It should help with making the transition from all-too-familiar batch processing of relational data to stream analytics.

Figure 1. Basic Streaming Analytics Architecture

Our (very basic) setup will have:

  • an IoT device simulator provisioning sample telemetry data
  • a data ingestion engine
  • live data stream analytics engine
  • notification service pushing messages to a mobile device

1. Set up the OS environment

Before we dive into the development process, you need to make sure that your OS is properly configured to interact with Azure, so go ahead and:

  1. Install the az command-line tool.
  2. Install the func Azure Functions Core Tools.

The Azure Function (second-to-last element in Figure 1) will be coded in Python. At the time of writing this article, Azure required a particular Python version(3.6.8) so I recommend installing pyenv to manage your Python versions. Depending on your architecture your steps might vary, so we’ll not go into details, but — as always — Google is your friend. Once pyenv is installed, go ahead and set up the environment:

pyenv install 3.6.8
pyenv virtualenv 3.6.8 azure-3.6.8
pyenv activate azure-3.6.8
pip install --upgrade pip
pip install azure-eventhub

2. Data Producers

Now that the environment is configured, we can code a sample IoT data generator. Our dummy device will produce telemetry data mimicking the ones generated during milk collection and transportation process and push them to a cloud backend for further processing. We can write a simple Python code that will serve the purpose. This is the first element (IoT sensor) in Figure 1.

This script sends a message every second with a device ID randomly selected from a range between 1 and 5 (we simulate 5 devices in total) and a random temperature value from a range between 2.5°C and 10.5°C. You’ll notice a few placeholders in the code that will have to populated with correct values further down the road, so refrain from running the script just now.

3. Data Ingestion

Azure Event Hubs will be used to collect the sensor data. Event Hubs scale great and can handle millions of messages per second. They can also be set up to work in Apache Kafka compatible mode which makes migration of your existing on-premise solution a straightforward process.

Create a Ressource Group

First, you need to create a Ressource Group — a container to hold all related Azure services. Read this article if you’d like to know more about Azure Ressource Manager.

Figure 2. Creating an Azure Ressource Group
  1. In the Azure Portal select All services and find Ressource groups.
  2. Select a subscription you’d like to associate it with (you might be eligible for a Free Tier), choose a good descriptive name (here DAC-IoT), select your subscription region and confirm with the “Review + create” button.

Create Event Hub Namespace

Once the Resource Group has been created, proceed with the Event Hub setup. Every Azure Event Hub resides within a namespace so the latter must be created first.

Figure 3. Creating an Event Hub Namespace
  1. In Azure Portal select Event Hub from the list of services and press Add to create a new namespace. The Create Namespace blade will appear.
  2. Choose a name that is unique across the entire Azure landscape (here:dac-obc). It will be part of an FQDN that you’ll be using to connect your IoT devices to the Event Hub.
  3. Select the desired Subscription and assign a Ressource group.
  4. Check the Apache Kafka box if you’d like to enable Kafka compatibility and confirm with Create.

Create Event Hubs

With the namespace ready proceed and deploy two Event Hubs: one for data ingestion and one for alerts. If an analogy helps, think of Event Hubs as data pipelines, comparable (and in our case — compatible) with Apache Kafka topics. We will be using the data ingestion one to consume data provided by our IoT devices. Once processed by the analytics engine we’ll use the alert Event Hub to push input to our notification service.

Figure 4. Creating Event Hubs
  1. Under Services, locate the Event Hubs, click it and then click the namespace created in the previous step. A new workspace will appear.
  2. In the Entities section click the Event Hubs and click the “+ Event Hub” button. A new blade will appear.
  3. Define a name for your inbound Event Hub (here: dac-obc-inbound). Again, if this helps you with choosing the right one, think of it as a Kafka Topic.
  4. You can choose to capture the messages ingested by this event hub and store them in Azure Store or Azure Data Late Store for batch processing. We’ll skip this step but it definitively makes sense in production use.
  5. Confirm with the Create button. Azure will commence the deployment in the background.
  6. Repeat steps 1–5 and create dac-obc-alerts Event Hub.

With both Event Hubs ins place, we’ll have to locate the access keys and revisit the Python data producer script. You’ll find there two variables that must be adjusted: EVENT_HUB_ADDRESS and SAS_PRIMARY_KEY. The placeholders in the former one should be replaced with your Event Hub namespace and the inbound Event hub’s name — in our case dac-obc and dac-obc-inbound respectively.

To find the value for the SAS_PRIMARY_KEY, go to the Azure Portal, select Event Hubs, choose the namespace created for this exercise and click Shared access policies in the Settings subsection, just as depicted below:

Figure 5. Event Hub Namespace Shared Access Policies

In our exercise, we’ll be using the default RootManageSharedAccessKey. Copy the Primary key value and adjust the script.

4. Data Analytics

With the data pipelines ready, proceed to configure the engine to analyze the inbound datasets. Azure Stream Analytics will be the weapon of choice here. It uses a familiar, SQL-like syntax to process the data stream, can read from one Event Hub and write processed data to another one. In our simple use case, we will be consuming the data from the dac-obc-inbound Event Hub, running some basic analytics jobs on a time window and forwarding processed data to the dac-obc-alerts stream. The latter will be used to trigger an Azure function sending a message to a mobile device (more on that later).

Figure 6. Creating a Stream Analytics Job
  1. In the Azure Portal, click Stream Analytics jobs and click Add in the newly opened blade.
  2. Choose a descriptive name (here: dac-obc-asa-alerts), select a Subscription, assign a Resource Group, choose a Location where you’d like to deploy the job and make sure Cloud is chosen as a Hosting environment.
  3. Once the job has been deployed, hit Refresh and click the name of the newly created job.

Azure Stream Analytics jobs have several configuration parameters you can tweak, but we’ll focus only on the ones in Job Topology section of the menu: Input, Query, and Output.

Figure 7. Input, Query, and Output Job Topology Parameters

If the Query is your SQL statement, then Input and Output are your FROM and INTO clauses respectively. Both serve as aliases for inbound and outbound data enabling you to plug a variety of sources and sinks.

Input

Let’s start by defining an Input:

Figure 8. Stream Analytics Input definition
  1. Click Input in the Job topology section, click Add Stream input and select Event Hub. A new blade will appear.
  2. Choose a descriptive name for your input alias (here:OBCinbound).
  3. In Event Hub namespace section select the one you have created.
  4. Under Event Hub name choose Use existing, select the inbound Event Hub from the drop-down menu and leave the rest of the default settings intact. Confirm with the Save button.

Output

The output of the Stream Analytics job will be the previously created dac-obc-alerts Event hub.

Figure 9. Stream Analytics Output definition
  1. In the Job Topology section click Output, click Add and select the Event Hub. Again a new blade will appear.
  2. Choose a descriptive name for your output alias (here:OBCalerts).
  3. Under Event Hub namespace select the one you have created.
  4. Under Event Hub name choose Use existing, select the alerts Event Hub from the drop-down menu and leave the rest of the default settings as they are. Confirm with the Save button.

Query

Our use case will be to constantly analyze data stream from an onboard computer (OBC) of a dairy truck. The OBC is sending JSON payloads with temperature and geocoordinates at regular time intervals. Since milk is a perishable product, it must be transported at temperatures below 7°C to prevent the development of harmful bacteria and overall contamination. To counteract as soon as possible we want to trigger an alert, should an average temperature in the last 30 seconds reported by one OBC exceed a notification threshold. We can use SQL-like syntax to turn that into code:

The code is fairly simple and should be self-explanatory: our inbound stream has data from several OBCs, so to calculate the average temperature in a “tumbling window” we’re grouping by the device ID and a time window. Finally, we’re selecting only those records, where a threshold has been exceeded. This output is then sent to the alerts Event Hub (aliased as OBCalerts in the code above) for further processing. Now we can create the Stream Analytics Query:

Figure 10. Stream Analytics Query definition
  1. In the Job Topology section click Query.
  2. In the new window type the previously defined SQL-like query in the editor section. Don’t forget to use your Input and Output aliases for FROM and INTO clauses respectively. Confirm with Save

When you’re done, the Overview blade of the Stream Analytics job will look more-less like that:

Figure 11. A fully configured Stream Analytics job

Keep in mind: the job will remain inactive until you activate it with the “Start” button.

5. Mobile push notifications

With Event Hubs and Stream Analytics configured, we can now focus on setting up the push notifications for mobile devices. There are many alternatives on the market, but we’ll go for one of the most cost-effective (as in free) ones — Telegram Bot.

Setup a Telegram Bot and Telegram Group

We’ll set up a Telegram bot as an agent reporting the problems and a Telegram group that you, the bot and other vital team members will become part of. In case of an emergency, the bot will send a message to the group giving you time to react.

Setting up a bot requires an existing Telegram account and the process has already been documented very well. There are two things you should remember though:

  1. Make note of the HTTP API access token that you’ll be given at the end of the bot creation process.
  2. Using your Telegram Application (mobile, desktop or web client) exchange at least one message with the bot to be able to receive messages posted by the bot.

You should also get the chat ID of the Telegram group that you have created. Do this by adding @getmyid_bot to your group temporarily and typing in a random message. Friendly @getmyid_bot will report back immediately with the required data. Once you have the necessary information, you can remove the ID bot service from the group.

You can test if the bot has been properly set up by copy-pasting the link into your browser. Don’t forget to replace <API_KEY> and <GROUP_ID> with your correct values:

https://api.telegram.org/bot<API_KEY>/sendMessage?text=Test&chat_id=<CHAT_ID>

Armed with the token, chat ID, and with a tested bot service, we can build and deploy Azure function.

Azure Functions

Our function will “ingest” the data from the OBCalerts Stream Analytics output, parse them and use the bot to send Telegram notifications. Creating a function requires Azure storage so we should start with that. We can use the Azure Portal, or we can make use of the az command-line tool. First, we must log in into our Azure account (a browser window will open):

az login

Once the user context exists, we can proceed with creating Azure storage for our Function Apps:

az storage account create --name dacfunctions \
--resource-group DAC-IoT \
--sku Standard_LRS

Finally, we can create a Function App. Note that not all features are available in all regions, so we’re using US-West to deploy our application.

az functionapp create --resource-group DAC-IoT \
--os-type Linux \
--consumption-plan-location westus \
--runtime python \
--name TelegramMsg \
--storage-account dacfunctions

If you were successful, you should see your Function in the Azure Portal under the Function App section.

Figure 12. Defining a Function App

Since it’s always a terrible practice to store credentials in files (especially if those files will be versioned in Git), Azure allows us to create an abstraction layer between the Event Hub creds and the Python code. First, we must once again locate the API Keys and the connection string.

Figure 13. Revisiting Shared Access Policies
  1. In Azure Portal Navigate to Event Hubs.
  2. Click on the created Event Hub namespace (here: dac-obc)
  3. In the blade’s navigation pane locate the Settings section and select “Shared access policies”.
  4. You can (and should) create dedicated access credentials, but we’ll use the default ones.
  5. Copy the Connection String for later.

With the credentials, we can define a connection string for our function. This is the abstraction layer I’ve mentioned.

Figure 14. Defining a connection variable in Function App properties
  1. Navigate to Azure Portal → Function App and click on your function.
  2. In the Configured features section, click the Configuration hyperlink.
  3. Click “New connection string” and define a new connection of type “Custom”.
  4. Choose a descriptive name (here dac-obc-alerts-conn) and paste the value of the previously copied “Connect string-primary key” parameter. Confirm with OK.

So far, so good. Our function app is configured, now we need to populate it with some code. Switch back to your terminal window, change directory where you keep your projects and type in:

func init TelegramMsg

When prompted to choose a worker runtime select Python. Once the application stub is ready, create a function in the application’s directory.

cd TelegramMsg
func new

Choose “Azure Event Hub Trigger” and accept the default EventHubTrigger function name. Once the function is deployed, three more files must be adjusted. First is the requirements.txt where you should place a single line of text:

python-telegram-bot==11.1.0

We’ll use this python package to send messages using the telegram bot. Switch location to the EventHubTrigger directory. First, edit the function.json file and adjust the eventHubName and connection parameters:

{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventHubTrigger",
"name": "event",
"direction": "in",
"eventHubName": "dac-obc-alerts",
"connection": "dac-obc-alerts-conn",
"cardinality": "many",
"consumerGroup": "$Default"
}
]
}

Finally, open the __init__.py file and replace its content with the Python code:

This is a very basic script with no exception handling. Just as I’ve mentioned in the very beginning, the goal is to introduce you some basics. Coding best-practices should be part of your DNA.

Let’s try and deploy the function to Azure Cloud. Change back into the app’s root directory (TelegramMsg) and execute the following command:

func azure functionapp publish TelegramMsg --build-native-deps

Assuming everything went fine, you’re all set! If you haven’t done that thus far, make sure to start your Stream Analytics job in Azure Portal and you’re good to go!

Have fun!

--

--