
AdTech using SingleStoreDB, Kafka and Metabase
Abstract
SingleStoreDB can ingest and process large quantities of streaming data with ease. In this article, we'll build an AdTech application that simulates millions of Ad Campaign events streaming from a Kafka broker into SingleStoreDB. We'll also use Metabase to build a dashboard to visualise the data in SingleStoreDB.
Introduction
In aprevious article series, we've seen how to use SingleStoreDB with Kafka hosted on the Confluent Cloud. The example simulated global sensors sending temperature readings that could be stored and analysed in SingleStoreDB. In this article, we'll use a more localised Kafka broker and create a visual dashboard to provide insights into and analysis of the streamed data.
The complete SQL code is listed in the Appendix.
Create a SingleStoreDB Cloud account
Aprevious article showed the steps required to create a free SingleStoreDB Cloud account. We'll useAdTech Demo Group as our Workspace Group Name andadtech-demo as our Workspace Name.
Once we've created our database in the following steps, we'll make a note of ourpassword andhost name.
Create a Database and Tables
In our SingleStoreDB Cloud account, we'll use theSQL Editor to create a new database, as follows:
CREATEDATABASEIFNOTEXISTSadtech;
We'll also create two tables. First, theevents
table:
USEadtech;CREATETABLEevents(user_idINT,event_nameVARCHAR(128),advertiserVARCHAR(128),campaignINT(11),genderVARCHAR(128),incomeVARCHAR(128),page_urlVARCHAR(128),regionVARCHAR(128),countryVARCHAR(128),SORTKEYadtmidx(user_id,event_name,advertiser,campaign),SHARDKEYuser_id(user_id));
This table stores details of theadvertiser
,campaign
and various demographic information about the user, such asgender
andincome
.
Second, thecampaigns
table:
CREATEREFERENCETABLEcampaigns(campaign_idSMALLINT(6)NOTNULLDEFAULT'0',campaign_nameVARCHAR(255)CHARACTERSETutf8COLLATEutf8_general_ciDEFAULTNULL,PRIMARYKEY(campaign_id));
We have a small number of campaigns and usingREFERENCE
will keep the table in memory and provide very fast access.
Next, we'll insert the details of the campaigns:
INSERTINTOcampaignsVALUES(1,'demand great'),(2,'blackout'),(3,'flame broiled'),(4,'take it from a fish'),(5,'thank you'),(6,'designed by you'),(7,'virtual launch'),(8,'ultra light'),(9,'warmth'),(10,'run healthy'),(11,'virtual city'),(12,'online lifestyle'),(13,'dream burger'),(14,'super bowl tweet');
Create a Pipeline
Pipelines allow us to create streaming ingest feeds from various sources, such as Kafka, S3 and HDFS, using a single command. With pipelines, we can also perform ETL operations.
For our use case, we can create a simple pipeline in SingleStoreDB as follows:
CREATEPIPELINEeventsASLOADDATAKAFKA'public-kafka.memcompute.com:9092/ad_events'BATCH_INTERVAL2500INTOTABLEeventsFIELDSTERMINATEDBY'\t'ENCLOSEDBY''ESCAPEDBY'\\'LINESSTARTINGBY''(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
We can control the rate of data ingestion using theBATCH_INTERVAL
. Initially, we'll set this to 2500 milliseconds.
We can configure the pipeline to start from the earliest offset, as follows:
ALTERPIPELINEeventsSETOFFSETSEARLIEST;
and we can test the pipeline before we start running it, as follows:
TESTPIPELINEeventsLIMIT1;
The pipeline test should result in an error, as the value for thecampaign
column is out of range:
ERROR 1264 ER_WARN_DATA_OUT_OF_RANGE: ... Out of range value for column 'campaign'
Let's look at the data in thead_events
topic from the Kafka broker and see if we can identify the problem. We'll installkcat (formerly kafkacat):
sudoaptinstallkafkacat
and then run it as follows:
kafkacat-C-b public-kafka.memcompute.com:9092-t ad_events
Here are 10 rows of example output:
...607982731 Downstream Conversion Verizon Wireless 5 9 Male 50k - 75k / Utah US607982732 Impression AT&T Wireless 4 3 Female unknown /2014/04/make-spring-centerpiece-with-tulips.html Florida US607982732 Click AT&T Wireless 4 3 Female unknown /2014/04/make-spring-centerpiece-with-tulips.html Florida US607982732 Impression AT&T Wireless 5 9 Female unknown /category/31-day-drawing-challenge/ Florida US607982732 Impression AT&T Wireless 5 7 Female unknown /2016/05/lars-book-club-may.html/ Florida US607982732 Impression AT&T Wireless 5 7 Female unknown /2013/01/polka-dot-your-wrapping.html/ Florida US607982732 Impression AT&T Wireless 13 3 Female unknown /2016/03/12-best-paper-flowers-volume-3.html/ Florida US607982733 Impression Dominos Pizza 5 6 Female 25k and below /2014/07/make-the-midsummer-floral-crown.html/ Pennsylvania US607982734 Impression Lowes Companies 5 13 Female unknown /2016/01/eyeball-drink-stirrers.html/ New York US607982735 Impression Lowes Companies 4 6 Male unknown /2016/02/review-of-the-5-best-planners-of-2016.html/2/ California US...
If we take the first row above, and check for hidden characters, we'll see that we have the tab character (\t
) separating the fields:
607982731<tab>Downstream<space>Conversion<tab>Verizon<space>Wireless<tab>5<space>9<tab>Male<tab>50k<space>-<space>75k<tab>/<tab>Utah<tab>US
Mapping the fields back to the table schema, we'll see that thecampaign
column is meant to be an integer, but the stream contains two integers separated by a space:
... <tab>integer<space>integer<tab> ...
We can change the pipeline slightly to deal with this problem. First, we'll drop the pipeline:
DROPPIPELINEevents;
Then we'll modify line 4 of the pipeline with the keywordIGNORE
, as follows:
CREATEPIPELINEeventsASLOADDATAKAFKA'public-kafka.memcompute.com:9092/ad_events'BATCH_INTERVAL2500IGNOREINTOTABLEeventsFIELDSTERMINATEDBY'\t'ENCLOSEDBY''ESCAPEDBY'\\'LINESSTARTINGBY''(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);
We can configure the pipeline again to start from the earliest offset, as follows:
ALTERPIPELINEeventsSETOFFSETSEARLIEST;
and we can test the pipeline again before we start running it, as follows:
TESTPIPELINEeventsLIMIT1;
This time the pipeline test should be successful and we should see some output, similar to the following:
+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+| user_id | event_name | advertiser | campaign | gender | income | page_url | region | country |+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+| 605804911 | Impression | Aldi | 13 | unknown | unknown | /2016/05/day-9-draw-a-rose.html/ | Nevada | US |+-----------+------------+------------+----------+---------+---------+----------------------------------+--------+---------+
We can now start the pipeline:
STARTPIPELINEevents;
Install Metabase
To use Metabase, we'll follow theJAR file installation instructions and run the JAR file, as follows:
java-jar metabase.jar
Next, we'll open the following webpage:
http://localhost:3000/setup
We should see the following, shown in Figure 1.
We'll work through the various wizards to complete the language and user settings. To add our data, we'll select MySQL and we'll need to fill in the details for our connection, as follows:
- Display name: S2
- Host:
<host>
- Port: 3306
- Database name: adtech
- Username: admin
- Password:
<password>
We'll replace the<host>
and<password>
with the values from our SingleStoreDB Cloud account.
We'll click theConnect database button. The connection should be successful.
Finally, we'll clickFinish and thenTake me to Metabase.
Next, we should see the following, as shown in Figure 2.
Create Dashboard
We are now ready to create our dashboard.
1. Total Number of Events
From the top right, we'll select+ New and thenSQL Query as shown in Figure 3.
After selecting theS2 database, we'll enter the following into theSQL Editor in Metabase:
SELECTCOUNT(*)FROMevents;
We can run the query using theright arrow button (▶) in the bottom right and use theSave link in the top right to save this query. We'll call the queryTotal Number of Events. We'll create a new dashboard calledAdTech and add the query to the dashboard, as shown in Figure 4.
2. Events by Region
We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Events by Region):
SELECTevents.countryAS`events.country`,COUNT(events.country)AS'events.countofevents'FROMadtech.eventsASeventsGROUPBY1;
With this query, we have the option of creating aVisualization. We'll create a pie chart.
3. Events by Top 5 Advertisers
We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Events by Top 5 Advertisers):
SELECTevents.advertiserAS`events.advertiser`,COUNT(*)AS`events.count`FROMadtech.eventsASeventsWHERE(events.advertiserLIKE'%Subway%'ORevents.advertiserLIKE'%McDonals%'ORevents.advertiserLIKE'%Starbucks%'ORevents.advertiserLIKE'%Dollar General%'ORevents.advertiserLIKE'%YUM! Brands%'ORevents.advertiserLIKE'%Dunkin Brands Group%')GROUPBY1ORDERBY`events.count`DESC;
With this query, we have the option of creating aVisualization. We'll create a bar chart.
4. Ad Visitors by Gender and Income
We'll repeat the previous process of creating an SQL Query and adding it to the AdTech dashboard for the following query (Ad Visitors by Gender and Income):
SELECT*FROM(SELECT*,DENSE_RANK()OVER(ORDERBYxx.z___min_rank)ASz___pivot_row_rank,RANK()OVER(PARTITIONBYxx.z__pivot_col_rankORDERBYxx.z___min_rank)ASz__pivot_col_ordering,CASEWHENxx.z___min_rank=xx.z___rankTHEN1ELSE0ENDASz__is_highest_ranked_cellFROM(SELECT*,Min(aa.z___rank)OVER(PARTITIONBYaa.`events.income`)ASz___min_rankFROM(SELECT*,RANK()OVER(ORDERBYCASEWHENbb.z__pivot_col_rank=1THEN(CASEWHENbb.`events.count`ISNOTNULLTHEN0ELSE1END)ELSE2END,CASEWHENbb.z__pivot_col_rank=1THENbb.`events.count`ELSENULLENDDESC,bb.`events.count`DESC,bb.z__pivot_col_rank,bb.`events.income`)ASz___rankFROM(SELECT*,DENSE_RANK()OVER(ORDERBYCASEWHENww.`events.gender`ISNULLTHEN1ELSE0END,ww.`events.gender`)ASz__pivot_col_rankFROM(SELECTevents.genderAS`events.gender`,events.incomeAS`events.income`,COUNT(*)AS`events.count`FROMadtech.eventsASeventsWHERE(events.income<>'unknown'ORevents.incomeISNULL)GROUPBY1,2)ww)bbWHEREbb.z__pivot_col_rank<=16384)aa)xx)zzWHERE(zz.z__pivot_col_rank<=50ORzz.z__is_highest_ranked_cell=1)AND(zz.z___pivot_row_rank<=500ORzz.z__pivot_col_ordering=1)ORDERBYzz.z___pivot_row_rank;
With this query, we have the option of creating aVisualization. We'll create a table.
Final Dashboard
Figure 5 shows an example of the charts being sized and positioned on the AdTech dashboard. We'll set the auto-refresh option to 1 minute.
Summary
In this article, we have created a pipeline in SingleStoreDB to connect to a Kafka broker. We have identified a problem with the incoming data and modified our pipeline to handle the issue. We have also used Metabase to create a quick visual dashboard to help us gain insights into our Ad Campaign. For additional details, please see the SingleStore documentation on how toLoad and Analyze AdTech Data.
Appendix
CREATEDATABASEIFNOTEXISTSadtech;USEadtech;CREATETABLEevents(user_idINT,event_nameVARCHAR(128),advertiserVARCHAR(128),campaignINT(11),genderVARCHAR(128),incomeVARCHAR(128),page_urlVARCHAR(128),regionVARCHAR(128),countryVARCHAR(128),SORTKEYadtmidx(user_id,event_name,advertiser,campaign),SHARDKEYuser_id(user_id));CREATEREFERENCETABLEcampaigns(campaign_idSMALLINT(6)NOTNULLDEFAULT'0',campaign_nameVARCHAR(255)CHARACTERSETutf8COLLATEutf8_general_ciDEFAULTNULL,PRIMARYKEY(campaign_id));INSERTINTOcampaignsVALUES(1,'demand great'),(2,'blackout'),(3,'flame broiled'),(4,'take it from a fish'),(5,'thank you'),(6,'designed by you'),(7,'virtual launch'),(8,'ultra light'),(9,'warmth'),(10,'run healthy'),(11,'virtual city'),(12,'online lifestyle'),(13,'dream burger'),(14,'super bowl tweet');CREATEPIPELINEeventsASLOADDATAKAFKA'public-kafka.memcompute.com:9092/ad_events'BATCH_INTERVAL2500INTOTABLEeventsFIELDSTERMINATEDBY'\t'ENCLOSEDBY''ESCAPEDBY'\\'LINESSTARTINGBY''(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);ALTERPIPELINEeventsSETOFFSETSEARLIEST;TESTPIPELINEeventsLIMIT1;DROPPIPELINEevents;CREATEPIPELINEeventsASLOADDATAKAFKA'public-kafka.memcompute.com:9092/ad_events'BATCH_INTERVAL2500IGNOREINTOTABLEeventsFIELDSTERMINATEDBY'\t'ENCLOSEDBY''ESCAPEDBY'\\'LINESSTARTINGBY''(user_id,event_name,advertiser,campaign,gender,income,page_url,region,country);ALTERPIPELINEeventsSETOFFSETSEARLIEST;TESTPIPELINEeventsLIMIT1;STARTPIPELINEevents;-- Total Number of EventsSELECTCOUNT(*)FROMevents;-- Events by RegionSELECTevents.countryAS`events.country`,COUNT(events.country)AS'events.countofevents'FROMadtech.eventsASeventsGROUPBY1;-- Events by Top 5 AdvertisersSELECTevents.advertiserAS`events.advertiser`,COUNT(*)AS`events.count`FROMadtech.eventsASeventsWHERE(events.advertiserLIKE'%Subway%'ORevents.advertiserLIKE'%McDonals%'ORevents.advertiserLIKE'%Starbucks%'ORevents.advertiserLIKE'%Dollar General%'ORevents.advertiserLIKE'%YUM! Brands%'ORevents.advertiserLIKE'%Dunkin Brands Group%')GROUPBY1ORDERBY`events.count`DESC;-- Ad Visitors by Gender and IncomeSELECT*FROM(SELECT*,DENSE_RANK()OVER(ORDERBYxx.z___min_rank)ASz___pivot_row_rank,RANK()OVER(PARTITIONBYxx.z__pivot_col_rankORDERBYxx.z___min_rank)ASz__pivot_col_ordering,CASEWHENxx.z___min_rank=xx.z___rankTHEN1ELSE0ENDASz__is_highest_ranked_cellFROM(SELECT*,Min(aa.z___rank)OVER(PARTITIONBYaa.`events.income`)ASz___min_rankFROM(SELECT*,RANK()OVER(ORDERBYCASEWHENbb.z__pivot_col_rank=1THEN(CASEWHENbb.`events.count`ISNOTNULLTHEN0ELSE1END)ELSE2END,CASEWHENbb.z__pivot_col_rank=1THENbb.`events.count`ELSENULLENDDESC,bb.`events.count`DESC,bb.z__pivot_col_rank,bb.`events.income`)ASz___rankFROM(SELECT*,DENSE_RANK()OVER(ORDERBYCASEWHENww.`events.gender`ISNULLTHEN1ELSE0END,ww.`events.gender`)ASz__pivot_col_rankFROM(SELECTevents.genderAS`events.gender`,events.incomeAS`events.income`,COUNT(*)AS`events.count`FROMadtech.eventsASeventsWHERE(events.income<>'unknown'ORevents.incomeISNULL)GROUPBY1,2)ww)bbWHEREbb.z__pivot_col_rank<=16384)aa)xx)zzWHERE(zz.z__pivot_col_rank<=50ORzz.z__is_highest_ranked_cell=1)AND(zz.z___pivot_row_rank<=500ORzz.z__pivot_col_ordering=1)ORDERBYzz.z___pivot_row_rank;
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse