Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Hackers And Slackers profile imageTodd Birchard
Todd Birchard forHackers And Slackers

Posted on • Originally published athackersandslackers.com on

     

Structured Streaming in PySpark

Structured Streaming in PySpark

Now that we're comfortable withSpark DataFrames, we're going to implement this newfound knowledge to help us implement a streaming data pipeline in PySpark. As it turns out, real-time data streaming is one of Spark's greatest strengths.

For this go-around, we'll touch on the basics of how to build a structured stream in Spark. Databricks has a few sweet features which help us visualize streaming data: we'll be using these features to validate whether or not our stream worked. If you're looking to hook Spark into a message broker or create a production-ready pipeline, we'll be covering this in a future post.

Preparing Some Data

We're going to need some reasonably real-looking data to get going here. Keeping in spirit with the nature of data streams (and overhyped technology trends), I've generated a set of data meant to mimic input from IoT devices. We're going to dive into structured streaming by exploring the very-real scenario of IoT devices streaming event actions to a centralized location.

The nature of this data is 20 different JSON files, where each file has 1000 entries. This collection of files should serve as a pretty good emulation of what real data might look like. If you'd like to get your hands on these files, I've uploaded themhere. Let's get a preview:

[{"time":"2018-10-27T16:52:04Z","customer":"Feliks Fuggle","action":"power on","device":"Footbot Air Quality Monitor"},{"time":"2019-04-05T10:56:05Z","customer":"Chelsea Redgewell","action":"power off","device":"Amazon Echo"},{"time":"2018-06-22T04:29:44Z","customer":"Cymbre Bischof","action":"low battery","device":"Nest T3021US Thermostat"},{"time":"2019-01-28T07:36:33Z","customer":"Bettye McKeefry","action":"low battery","device":"August Doorbell Cam"},{"time":"2019-04-21T23:33:44Z","customer":"Gratiana Shapera","action":"power on","device":"Nest T3021US Thermostat"},{"time":"2018-06-20T08:18:21Z","customer":"Hurleigh Gotcliff","action":"low battery","device":"Amazon Echo"},{"time":"2019-05-08T06:09:27Z","customer":"Freeland Giacomozzo","action":"power on","device":"Amazon Echo"},{"time":"2019-01-26T15:39:30Z","customer":"Kally Shawell","action":"low battery","device":"Footbot Air Quality Monitor"},{"time":"2018-06-17T08:24:24Z","customer":"Kerrin Gurley","action":"power on","device":"Amazon Echo"},{"time":"2018-06-05T03:32:00Z","customer":"Eduino Gilmour","action":"power on","device":"August Doorbell Cam"},{"time":"2019-01-13T15:52:30Z","customer":"Vernon Carleman","action":"power off","device":"Google Home Voice Controller"},{"time":"2018-09-12T23:42:01Z","customer":"Constantino Evitt","action":"power off","device":"August Doorbell Cam"},{"time":"2019-05-10T15:02:28Z","customer":"Jermaine Bofield","action":"low battery","device":"Amazon Dash Button"},{"time":"2018-12-23T22:05:23Z","customer":"Omero Westphal","action":"low battery","device":"Footbot Air Quality Monitor"},{"time":"2018-09-10T16:51:15Z","customer":"Gaylor Tour","action":"power off","device":"Amazon Echo"},{"time":"2018-10-29T18:56:45Z","customer":"Jordan Dikles","action":"power off","device":"Amazon Echo"},{"time":"2018-08-18T18:43:38Z","customer":"Phyllida Druitt","action":"low battery","device":"August Doorbell Cam"},{"time":"2018-08-10T00:12:19Z","customer":"Norrie Culwen","action":"power off","device":"August Doorbell Cam"},{"time":"2019-03-20T22:56:44Z","customer":"Filmore Paggitt","action":"power on","device":"August Doorbell Cam"},{"time":"2018-05-14T15:13:17Z","customer":"Jerald Galtone","action":"power on","device":"Google Home Voice Controller"},...
Enter fullscreen modeExit fullscreen mode

DISCLAIMER: This data is very not real (I've actually compiled it usingMockaroo, which is a great one-stop-shop for creating fake datasets). As a result, our data will look completely random (because it is).

We're going to build a structured stream which looks at a location where all these files are uploaded and streams the data.

Uploading Our Data

As much as I want to upload this data to S3, Amazon has locked me out of my own account. They've been locked me out for months, prompting me for a CVV for a credit card I no longer have (AWS support does nothing). If anybody knows somebody at Amazon, hit me up.

Instead, we'll host these files in our Databricks account, which is easily handled in the UI of thedata tab. After starting a cluster, I'll simply upload these 20 JSON files and store them in DBFS (Databricks file system). The path I'm using is/FileStore/tables/streaming/.

Structured Streaming in PySpark
Uploading data via the UI.

We can verify that the data has been uploaded by browsing DBFS:

Structured Streaming in PySpark
Browsing our filesystem

Looks like we're good to go!

If you're looking for a way to clean up DBFS, this can be accomplished by installing theDatabricks CLI. With this installed, we can remove files or directories usingdbfs rm -r dbfs:/FileStore/tables/[folder_name]/.

Loading and Inspecting Our Data

We start off by importing thetimestamp andstring types; we know we'll need to support these types from looking at our data earlier. Then we save the path to our data to the variableinputPath:

frompyspark.sql.typesimportTimestampType,StringType# Path to our 20 JSON filesinputPath="/FileStore/tables/streaming/"
Enter fullscreen modeExit fullscreen mode

We'll create a schema for our data; it's generally a bad idea to infer schema when creating a structured stream. We're streaming data from one a predictable source to another, thus we should explicitly to set our data structure (and eliminate the chance of this being set incorrectly).

# Explicitly set schemaschema=StructType([StructField("time",TimestampType(),True),StructField("customer",StringType(),True),StructField("action",StringType(),True),StructField("device",StringType(),True)])
Enter fullscreen modeExit fullscreen mode

StructType is a reserved word which allows us to create a schema made ofStructFields.StructField accepts 3 parameters: thename of our field, thetype, and whether or not the field should benullable.

Let's use all of this to create a DataFrame:

frompyspark.sql.typesimportTimestampType,StringType,StructType,StructField# Path to our 20 JSON filesinputPath="/FileStore/tables/streaming/"# Explicitly set schemaschema=StructType([StructField("time",TimestampType(),True),StructField("customer",StringType(),True),StructField("action",StringType(),True),StructField("device",StringType(),True)])# Create DataFrame representing data in the JSON filesinputDF=(spark.read.schema(schema).json(inputPath))display(inputDF)
Enter fullscreen modeExit fullscreen mode

Here's our output:

timecustomeractiondevice
nullnullnullnull
2018-06-05T01:33:35.000+0000Sharia Cromacklow batteryAugust Doorbell Cam
2019-01-08T17:03:51.000+0000Rosemary Rickfordlow batteryAmazon Dash Button
2019-04-26T14:45:54.000+0000Kessia Deemingpower offNest T3021US Thermostat
2018-07-24T12:49:36.000+0000Carver McCookpower offAmazon Echo
2018-08-24T09:00:57.000+0000Anet Eaglespower onAugust Doorbell Cam
2018-09-24T07:06:01.000+0000Rolando Momerypower onAmazon Echo
2018-08-22T09:20:51.000+0000Tore Littleoverpower onAugust Doorbell Cam
2018-08-13T13:23:27.000+0000Randa Ainsleypower offAugust Doorbell Cam
2018-06-05T20:40:33.000+0000Mayor Brattypower onAmazon Echo

A wild empty row appears! It seems as though our attempts to emulate a real-world scenario are going well: we already have our first dumb problem! No worries:

# Remove empty rowsinputDF=inputDF.dropna()display(inputDF)
Enter fullscreen modeExit fullscreen mode

That'll do it. Let's investigate our data further by taking a look at the distribution of actions amongst our IOT devices. We'll do this by creating a new DataFrame with an aggregate function: grouping by action:

# Aggregate number of actionsactionsDF=(inputDF.groupBy(inputDF.action).count())actionsDF.cache()# Create temp table named 'iot_action_counts'actionsDF.createOrReplaceTempView("iot_action_counts")
Enter fullscreen modeExit fullscreen mode

Now we can query the table we just created:

%sqlselectaction,sum(count)astotal_countfromiot_action_countsgroupbyaction
Enter fullscreen modeExit fullscreen mode

Check it out:

actiontotal_count
power on6631
low battery6733
power off6616

Sweet! Nothing unexpected here. Let's start streaming, shall we?

Streaming Our Data

To load data into a streaming DataFrame, we create a DataFrame just how we did withinputDF with one key difference: instead of.read, we'll be using.readStream:

# Create streaming equivalent of `inputDF` using .readStreamstreamingDF=(spark.readStream.schema(schema).option("maxFilesPerTrigger",1).json(inputPath))
Enter fullscreen modeExit fullscreen mode

That's right, creating a streaming DataFrame is a simple as the flick of this switch.

Our data isn't being created in real-time, so we'll have to use a trick to emulate streaming conditions. Instead of streaming data as it comes in, we can load each of our JSON files one at a time. That's whatoption() is doing: we're setting themaxFilesPerTrigger option to1, which means only a single JSON file will be streamed at a time. This will allow us to see the data as it streams in!

Another cool thing we can do is create a DataFrame fromstreamingDF with some transformations applied, like the aggregate we had earlier. This DataFrame will stream as it inheritsreadStream from the parent:

# Create streaming equivalent of `inputDF` using .readStream()streamingDF=(spark.readStream.schema(schema).option("maxFilesPerTrigger",1)# Treat a sequence of files as a stream by picking one file at a time.json(inputPath))# Stream `streamingDF` while aggregating by actionstreamingActionCountsDF=(streamingDF.groupBy(streamingDF.action).count())
Enter fullscreen modeExit fullscreen mode

DataFrames have a built-in check for when we quickly need test our stream's status. Running.isStreaming on a DataFrame will return a Boolean value, which will tell us whether or not the DataFrame is streaming:

# Is `streamingActionCountsDF` actually streaming?streamingActionCountsDF.isStreaming
Enter fullscreen modeExit fullscreen mode

The output:

True
Enter fullscreen modeExit fullscreen mode

Starting our Stream

Now we have a streaming DataFrame, but it isn't streaming anywhere. To stream to a destination, we need to callwriteStream() on our DataFrame and set all the necessary options:

spark.conf.set("spark.sql.shuffle.partitions","2")# View stream in real-timequery=(streamingActionCountsDF.writeStream.format("memory").queryName("counts").outputMode("complete").start())
Enter fullscreen modeExit fullscreen mode

Here's a quick breakdown:

Format

We can call.format() on a DataFrame which is streaming writes to specify thetype of destination our data will be written to. Each type of output is called anoutput sink (get it? Because we'restreaming into asink)..format() accepts the following:

  • parquet : Stores the output to a directory. Requires thepath option to be set, which sets the destination of the file.
  • kafka : Stores the output to one or more topics in Kafka. Required options arekafka.bootstrap.servers (list of Kafka server IP addresses) andtopic (Kafka topic or topics to write to).
  • console : Prints the output to the console/stdout every time there is a trigger. Used for debugging.
  • memory : The output is stored in memory as an in-memory table. Requires that.queryName() be set. Used for debugging.

We're just testing this out, so writing our DataFrame tomemory works for us. If this were writing somewhere real, we'd want to point to a message broker or what-have-you.

Output Mode

.outputMode() is used to determine the data to be written to a streaming sink. The "output" specifically refers to any time there is new data available in a streaming DataFrame..outputMode() accepts any of three values:

  • append : Only new rows will be written to the sink.
  • complete : All rows will be written to the sink every time there are updates.
  • update : Only the rows that were updated will be written to the sink, every time there are updates.

Start

Starts a stream of data when called on a streaming DataFrame.

Watching Data Stream Live in Databricks

How do we preview data being streamed to memory? Databricks has a dope way of seeing how streams are performing in real-time,while still in our notebook. Check out what happens when we run a cell that contains the above:

Structured Streaming in PySpark
Ohhhh snap!

Things are happening! We're shown useful information about the processing rate, batch duration, and so forth. Really cool stuff. If we switch to theraw data tab, we can see exactly what's happening:

Structured Streaming in PySpark
An overview of our output as it happens.

Now we're talking! Check out the value forbatchId... notice how it ticks up to 20 and then stops? That's one per JSON file! We're also shown things like thetimestamp ,numInoutRows , and other useful stuff.

Well, we did it. When all is said and done, building structured streams with PySpark isn't as daunting as it sounds. Until next time, space cowboy.

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Technology for Badasses.

A community of degenerates obsessed with data science, data engineering, and analysis. Tackling major issues with unconventional tutorials. Openly pushing a pro-robot agenda.

More fromHackers And Slackers

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp