Category Archives: Spark Streaming

Structured Streaming + Kafka Integration

In this post, I will show you how to create an end-to-end structured streaming pipeline. Let’s say, we have a requirement like:
JSON data being received in Kafka, Parse nested JSON, flatten it and store in structured Parquet table and get end-to-end failure guarantees.


//Step-1 Creating a Kafka Source for Streaming Queries
val rawData = spark.readStream
.format("kafka")
.option("kafka.boostrap.servers","")
.option("subscribe", "topic")
.load()

//Step-2
val parsedData = rawData
.selectExpr("cast (value as string) as json"))
.select(from_json("json", schema).as("data"))
.select("data.*")

//Step-3 Writing Data to parquet
val query = parsedData.writeStream
.option("checkpointLocation", "/checkpoint")
.partitionBy("date")
.format("parquet")
.start("/parquetTable")

Step-1: Reading Data from Kafka
Specify kafka options to configure
How to configure kafka server?
kafka.boostrap.servers => broker1,broker2 .load()
What to subscribe?
subscribe => topic1,topic2,topic3 // fixed list of topics
subscribePattern => topic* // dynamic list of topics
assign => {“topicA”:[0,1] } // specific partitions
Where to read?
startingOffsets => latest (default) / earliest / {“topicA”:{“0″:23,”1”:345} }

Step-2: Transforming Data
Each row in the source(rawData) has the following schema:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

Cast binary value to string Name it column json
//selectExpr(“cast (value as string) as json”)
Parse json string and expand into nested columns, name it data
//select(from_json(“json”, schema).as(“data”)))

Step-3: Writing to parquet.
Save parsed data as Parquet table in the given path
Partition files by date so that future queries on time slices of data is fast
Checkpointing
Enable checkpointing by setting the checkpoint location to save offset logs
//.option(“checkpointLocation”, …)
start actually starts a continuous running StreamingQuery in the Spark cluster
//.start(“/parquetTable/”)

Stay tuned for next post. 🙂

Reference: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

Easy, Scalable and Fault-tolerant Structured Streaming from Kafka to Spark

In this blog post, I will explain about spark structured streaming. Let’s first talk about what is structured streaming and how it works?
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

Let’s say you want to maintain a running program data received from Kafka to console(just an example). Below is the way of express Structured Streaming.

First, create a local SparkSession, the starting point of all functionalities related to Spark.

val spark = SparkSession.builder
    .master("local[*]")
    .appName("app-name")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
    .getOrCreate()

Next, let’s create a streaming DataFrame that represents data received from a server Kafka server.

/**
Specify one or more locations to read data from
Built in support for Files/Kafka/Socket,pluggable
Can include multiple sources of different types using union()
*/
val upstream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test-topic")
    .option("startingOffsets", "earliest")
    .load()

This upstream DataFrame represents an unbounded table containing the streaming data. This table contains seven columns data named key, value, topic, partition, offset, timestamp and timestampType. each streaming data becomes a row in the table.
The upstream DataFrame has the following columns:

key(binary) value(binary) topic(string) partition(long) offset(long) timestamp(long) timestampType(int)
[binary] [binary] “topicA” 0 345 1486087873 0
[binary] [binary] “topicB” 3 2890 1486086721 0

For more information, you can visit on Spark-Kafka strutucured streaming options.

val data = upstream.selectExpr("CAST(value AS STRING)")
val downstream = data
    .writeStream
    .format("console")
    .start()

  downstream.awaitTermination()

So now you have transformed DataFrame one column named “value” by Casting binary value to string and injected console sink. All data coming from Kafka will print on console.
Here is an example that will receive data from multiple Kafka topics and will partitioned data by topic name.

val spark = SparkSession.builder
    .master("local[*]")
    .appName("app-name")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val upstream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test,Airport,Airports,Carriers,Planedata")
    .option("startingOffsets", "earliest")
    .load()
    .selectExpr("topic", "CAST(value AS STRING)")// Transform "topic" and "value" columned

  val downstream = upstream
    .writeStream
// Partition by topic. it will create directory by topic name opic=Airport,topic=Carriers,topic=Planedata 
    .partitionBy("topic")
    .format("csv")
    .option("path", "/tmp/data")
    .outputMode("append")
    .trigger(ProcessingTime(3000))
    .option("checkpointLocation", checkpointLocation)
    .start()

  downstream.awaitTermination()

Here is complete source code.

Basic Concepts:

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
Note that Structured Streaming does not materialize the entire table.

Input Sources: There are a few built-in sources.
File source – Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet.
Kafka source – Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
Socket source (for testing) – Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
Rate source (for testing) – Generates data at the specified number of rows per second, each output row contains a timestamp and value.

There is a lot to explain about structured streaming so I can not write everything in the single post but hope you get a basic idea how structured stream works with Kafka.

References:
Structured Streaming Programming Guide

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)