This post is a guest publication written by Yaroslav Tkachenko, a Software Architect at Activision.
DStreams vs. DataFrames: Two Flavors of Spark Streaming
Apache Spark is one of the most popular and powerful large-scale data processing frameworks. It was created as an alternative to Hadoop’s MapReduce framework for batch workloads, but now it also supports SQL, machine learning, and stream processing. Today I want to focus on Spark Streaming and show a few options available for stream processing.
Stream data processing is used when dynamic data is generated continuously, and it is often found in big data use cases. In most instances, data is processed in near real-time, one record at a time, and the insights derived from the data are also used to provide alerts, render dashboards, and feed machine learning models that can react quickly to new trends within the data.
Watch Expedia Group Staff Data Engineer Brad Caffey’s presentation on Running Apache Spark jobs cheaper while maximizing performance:
DStreams vs. DataFrames
Spark Streaming went alpha with Spark 0.7.0. It’s based on the idea of discretized streams or DStreams. Each DStream is represented as a sequence of RDDs, so it’s easy to use if you’re coming from low-level RDD-backed batch workloads. DStreams underwent a lot of improvements over that period of time, but there were still various challenges, primarily because it’s a very low-level API.
As a solution to those challenges, Spark Structured Streaming was introduced in Spark 2.0 (and became stable in 2.2) as an extension built on top of Spark SQL. Because of that, it takes advantage of Spark SQL code and memory optimizations. Structured Streaming also gives very powerful abstractions like Dataset/DataFrame APIs as well as SQL. No more dealing with RDD directly!
Both Structured Streaming and Streaming with DStreams use micro-batching. The biggest difference is latency and message delivery guarantees: Structured Streaming offers exactly-once delivery with 100+ milliseconds latency, whereas the Streaming with DStreams approach only guarantees at-least-once delivery, but can provide millisecond latencies.
I personally prefer Spark Structured Streaming for simple use cases, but Spark Streaming with DStreams is really good for more complicated topologies because of its flexibility. That’s why below I want to show how to use Streaming with DStreams and Streaming with DataFrames (which is typically used with Spark Structured Streaming) for consuming and processing data from Apache Kafka. I’m going to use Scala, Apache Spark 2.3, and Apache Kafka 2.0.
Also, for the sake of example, I will run my jobs using Apache Zeppelin notebooks provided by Qubole. Qubole is a data platform that I use daily. It manages Hadoop and Spark clusters, makes it easy to run ad hoc Hive and Presto queries, and also provides managed Zeppelin notebooks that I happily use. With Qubole I don’t need to think much about configuring and tuning Spark and Zeppelin, it’s just handled for me.
The actual use case I have is very straightforward:
- Some sort of telemetry is written to Kafka: small JSON messages with metadata and arbitrary key/value pairs
- I want to connect to Kafka, consume, and deserialize those messages
- Then apply transformations if needed
- Collect some aggregations
- Finally, I’m interested in anomalies and generally bad data — since I don’t control the producer, I want to catch things like NULLs, empty strings, maybe incorrect dates and other values with specific formats, etc.
- The job should run for some time, then automatically terminate. Typically, Spark Streaming jobs run continuously, but sometimes it might be useful to run it ad hoc for analysis/debugging (or as an example in my case, since it’s so easy to run a Spark job in a notebook).
Streaming with DStreams
In this approach, we use DStreams, which is simply a collection of RDDs.
%spark import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.scheduler._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.util.LongAccumulator import scala.util.parsing.json.JSON val r = scala.util.Random // Generate a new Kafka Consumer group id every run val groupId = s"stream-checker-v${r.nextInt.toString}" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "kafka:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, // remove this if your Kafka doesn't use SSL "security.protocol" -> "SSL", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topic-to-consume") // Accumulating results in batches of val batchInterval = Seconds(5) // How many batches to run before terminating val batchesToRun = 10 case class Message( platform: String, uid: String, key: String, value: String ) // Counter for the number of batches. The job will stop after it reaches 'batchesToRun' value // Looks ugly, but this is what documentation uses as an example ¯\_(ツ)_/¯ object FinishedBatchesCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("FinishedBatchesCounter") } } } instance } } // 'sc' is a SparkContext, here it's provided by Zeppelin val ssc = new StreamingContext(sc, batchInterval) val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val messages = stream // We get a bunch of metadata from Kafka like partitions, timestamps, etc. Only interested in message payload .map(record => record.value) // We use flatMap to handle errors // by returning an empty list (None) if we encounter an issue and a // list with one element if everything is ok (Some(_)). .flatMap(record => { // Deserializing JSON using built-in Scala parser and converting it to a Message case class JSON.parseFull(record).map(rawMap => { val map = rawMap.asInstanceOf[Map[String,String]] Message(map.get("platform").get, map.get("uid").get, map.get("key").get, map.get("value").get) }) }) // Cache DStream now, it'll speed up most of the operations below messages.cache() // Counting batches and terminating after 'batchesToRun' messages.foreachRDD { rdd => val dinishedBatchesCounter = FinishedBatchesCounter.getInstance(sc) println(s"--- Batch ${dinishedBatchesCounter.count + 1} ---") println("Processed messages in this batch: " + rdd.count()) if (dinishedBatchesCounter.count >= batchesToRun - 1) { ssc.stop() } else { dinishedBatchesCounter.add(1) } } // Printing aggregation for the platforms: messages .map(msg => (msg.fp, 1)) .reduceByKey(_ + _) .print() // Printing messages with 'weird' uids val weirdUidMessages = messages.filter(msg => msg.uid == "NULL" || msg.uid == "" || msg.uid == " " || msg.uid.length < 10) weirdUidMessages.print(20) weirdUidMessages.count().print() // TODO: catch more violations here using filtering on 'messages' ssc.start() ssc.awaitTermination()
Streaming with DataFrames
Now we can try to combine Streaming with DataFrames API to get the best of both worlds!
%spark import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.scheduler._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.util.LongAccumulator val r = scala.util.Random // Generate a new Kafka Consumer group id every run val groupId = s"stream-checker-v${r.nextInt.toString}" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "kafka:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, // remove this if your Kafka doesn't use SSL "security.protocol" -> "SSL", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topic-to-consume") // Accumulating results in batches of val batchInterval = Seconds(5) // How many batches to run before terminating val batchesToRun = 10 // Message schema (for JSON objects) val schema = StructType( StructField("platform", StringType, false) :: StructField("uid", StringType, false) :: StructField("key", StringType, false) :: StructField("value", StringType, false) :: Nil ) // Counter for the number of batches. The job will stop after it reaches 'batchesToRun' value // Looks ugly, but this is what documentation uses as an example ¯\_(ツ)_/¯ object FinishedBatchesCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("FinishedBatchesCounter") } } } instance } } // 'sc' is a SparkContext, here it's provided by Zeppelin val ssc = new StreamingContext(sc, batchInterval) val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) // We get a bunch of metadata from Kafka like partitions, timestamps, etc. Only interested in message payload val messages = stream.map(record => record.value) // This turns accumulated batch of messages into RDD messages.foreachRDD { rdd => // Now we want to turn RDD into DataFrame val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Following 2 lines do all the magic for transforming String RDD into JSON DataFrame // Using the schema above val rawDF = rdd.toDF("msg") val df = rawDF.select(from_json($"msg", schema) as "data").select("data.*") // Cache DataFrame now, it'll speed up most of the operations below df.cache() // We can create a view and use Spark SQL for queries //df.createOrReplaceTempView("msgs") //spark.sql("select count(*) from msgs where uid = '0'").show() val finishedBatchesCounter = FinishedBatchesCounter.getInstance(sc) println(s"--- Batch ${finishedBatchesCounter.count + 1} ---") println("Processed messages in this batch: " + df.count()) println("All platforms:") df.groupBy("platform").count().show() println("Weird user ids:") val weirdUidDF = df.filter("uid is NULL OR uid = '' OR uid = ' ' OR length(uid) < 10") weirdUidDF.show(20, false) println("Total: " + weirdUidDF.count()) // TODO: catch more violations here using filtering on 'df' // Counting batches and terminating after 'batchesToRun' if (finishedBatchesCounter.count >= batchesToRun - 1) { ssc.stop() } else { finishedBatchesCounter.add(1) } } ssc.start() ssc.awaitTermination()
Conclusion
Which approach is better? Since DStream is just a collection of RDDs, it’s typically used for low-level transformations and processing. Adding a DataFrames API on top of that provides very powerful abstractions like SQL, but requires a bit more configuration. And if you have a simple use case, Spark Structured Streaming might be a better solution in general!