Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Build a pipeline to join streams of real time data

NotificationsYou must be signed in to change notification settings

abhirockzz/streaming-data-pipeline-azure

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This tutorial with walk you through how to build a streaming data enrichment pipeline on Azure. Please refer tothis blog post for additional background.

Components

  • Azure Event Hubs (Input Data source) - ingests raw orders data
  • Azure SQL Database (Reference Data source) - stores reference customer data
  • Azure Stream Analytics (Stream Processing) - joins the stream of orders data from Azure Event Hubs with the static reference customers data
  • Azure Cosmos DB (Output data source) - acts as a "sink" to store enriched orders info

Pre-requisites

git clone https://github.com/abhirockzz/streaming-data-pipeline-azure.gitcd streaming-data-pipeline-azure

Setup and configuration

This section will cover the following:

  • Setup Azure Event Hubs Namespace and Topic (orders)
  • Setup Azure SQL DB and load customer data
  • Create Azure Cosmos account and container
  • Create and configure Azure Stream Analytics Job, Input source, Reference data and Output source

Before we setup the services, create a Resource Group - you can use theAzure Portal or Azure CLI (az group create command)

az group create -l <location> -n <name of resource group>

Create the rest of the services in the same resource group and location

Azure Event Hubs

Create anEvent Hubs Namespace and Hub (topic) - the topic that you create (you can name itorders) will be used by Azure Stream Analytics as a (streaming) "source" for raw orders data. This is JSON data in this format:

{"orderID":"a82bddcb-a709-4702-4fc0-e1b9445b87d2","customerID":42,"amount":200}

You can setup Event Hubs using either of these options:Azure Portal,Azure CLI,ARM template orAzure PowerShell

Azure SQL Database

SQL Server is used as the reference data store. It contains customer information, for example:

  cust_id         cust_name                        city                ----------- ------------------------------ --------------------    1           Willis Collins                    Dallas                  2           Casey Brady                       Chicago                 3           Walker Wong                       SanJose                 4           Randall Weeks                     SanDiego                5           Gerardo Dorsey                    Dallas

You canfollow these steps to create a logical SQL server and a single database - usecustomers as the database name

Once completed, you should have a SQL server...

... and thecustomers database

Go ahead, create a table in the database and import sample data (10000 rows fromcustomers.csv). I have usedsqlcmd andbcp (CLI tools for SQL Server) in the example below:

If you want to use the Azure Portal to upload data, (skip this and) check the next step

To create a table:

sqlcmd -S<sql server name>.database.windows.net -U<admin username> -P<admin password> -d<database name> -i ./customers.sql

To import data:

bcp Crm.Customersin customers.csv -S<sql server name>.database.windows.net -d<database name> -U<admin username> -P<admin password> -q -c -t ,

To confirm that the data has been imported:

sqlcmd -S<sql server name>.database.windows.net -U<admin username -P<admin password> -d<database name> -Q"SELECT TOP 10 * FROM Crm.Customers;"

You can also use theQuery editor in the portal. Simply paste the contents ofcustomers.sql) file into the editor and clickRun

Azure Cosmos DB

We will use Azure Cosmos DB to store the "enriched" data has customer information along with the raw orders data. For example, an item in the Cosmos DB container will look like this:

{"order_id":"21a50dd1-6d40-4aa8-8ad9-a2fdcfb5f5e0","customer_id":"8512","purchase_amount":182,"customer_name":"Robby Johns","city":"NewJersey","id":"044593cf-9399-42e5-b94c-6170bb9a3c1e","_rid":"9P02AJ44T1ICAAAAAAAAAA==","_self":"dbs/9P02AA==/colls/9P02AJ44T1I=/docs/9P02AJ44T1ICAAAAAAAAAA==/","_etag":"\"04001fb0-0000-1800-0000-5f3d63630000\"","_attachments":"attachments/","_ts":1597858659}

You can use the Azure Portal tocreate an Azure Cosmos DB account. Once that's complete, go ahead andadd a database and container

use/customer_id as the partition key for your container

Azure Stream Analytics

Finally, it's time to setup Azure Stream Analytics. It will stitch together all the components to create and end to end solution.

Start by creating an Azure Stream Analytics job. If you want to use the Azure Portal, just follow thesteps outlined in this section or use theAzure CLI instead if you don't prefer clicking on a UI.

Create Azure Event Hubs Input

To configure theAzure Event Hubs as a Streaming Input source, open the Job in the portal, chooseInput

Create Azure SQL Reference Input

To configure the Azure SQL Database as a Reference Input source, open the Job in the portal, chooseInput >Add reference Input >SQL Database

Choose the SQL database you created previously:

Create Azure Cosmos DB Output

To configure Azure Cosmos DB as an Output, chooseOutput and proceed as below:

Create Query

Finally, we will need to setup the query that will leverage info from Input sources (Event Hubs and SQL Server) and pipe them to the Output (Cosmos DB). Open the Job, chooseQuery and enter the below query:

SELECT o.orderID as order_id, o.customerID as customer_id, c.cust_name as customer_name, c.city as city, o.amount as purchase_amountFROM orders oJOIN customers c  ON o.customerID = c.cust_id

Here is the screenshot:

Once everything is setup, this is what your Job config should look like:

Run the streaming pipeline

(Optional) Test with sample data - You can test you query with sample orders data. Upload thesample-orders.json file in theorder Input and run the query to check the output.

Start theJob and wait for it to transition toRunning status

Export Event Hubs info as environment variables:

export EVENTHUBS_BROKER=<enter event hubs namespace>.servicebus.windows.net:9093export EVENTHUBS_CONNECTION_STRING="Endpoint=sb://<enter event hubs namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<enter event hubs key>"export EVENTHUBS_TOPIC=orders

Start order generator application - this will push randomly generated orders to the Event Hubs topic:

docker run -it -e EVENTHUBS_BROKER=$EVENTHUBS_BROKER -e EVENTHUBS_CONNECTION_STRING=$EVENTHUBS_CONNECTION_STRING -e EVENTHUBS_TOPIC=$EVENTHUBS_TOPIC abhirockzz/orders-generator

Wait for a few orders to be sent. You should see logs as such:

sent message to partition 1 offset 39{"orderID":"6937ffdb-6db3-9164-a480-599493da9329","customerID":8512,"amount":182}sent message to partition 3 offset 48{"orderID":"7e4b8eec-429d-d800-b1e8-3ecc7efb4041","customerID":5027,"amount":33}sent message to partition 2 offset 43{"orderID":"7676330c-76b7-641c-f73c-12384789fa94","customerID":2200,"amount":60}sent message to partition 4 offset 48{"orderID":"2e091408-e960-57aa-c1c2-d432bab91e4b","customerID":7203,"amount":403}sent message to partition 1 offset 40{"orderID":"590a3827-e353-5f07-4cb5-44518dc2cad1","customerID":954,"amount":117}sent message to partition 1 offset 41{"orderID":"5cea8317-48c3-0235-b177-8a54a5ac5c5b","customerID":6504,"amount":332}sent message to partition 4 offset 49{"orderID":"e2d8e7ee-cf97-3992-9074-1b363b608b27","customerID":3633,"amount":158}

The Stream Analytics query should now kick into action andJOIN the orders information (from Event Hubs) with the customer reference data (in SQL Server). To confirm this, check the enriched orders information in Cosmos DB.

Go to your Cosmos DB account in the Azure portal, chooseData Explorer and see theItems in your container

Run a few queries:

//orders from ChicagoSELECT* FROM c where c.city="Chicago"//average purchase amountfor orders from ChicagoSELECT VALUE AVG(c.purchase_amount) from c where c.city="Chicago"//average purchase amount per citySELECT AVG(c.purchase_amount) AS avg_purchase, c.cityFROM cGROUP BY c.city//total purchase amount per citySELECT SUM(c.purchase_amount) AS total_purchase, c.cityFROM cGROUP BY c.city

Here is a screenshot:

Clean up

Once you're done, you can delete all the services by simply deleting the resource group (az group delete)

az group delete -n <name of resource group>

Wrap-up

I hope this helps you get started with Azure Stream Analytics and test the waters before moving on to more involved use cases. In addition to this, there is plenty of material for you to dig in!


[8]ページ先頭

©2009-2025 Movatter.jp