Part V spark streaming Programming Guide

Part IV spark streaming Programming Guide (1) The execution mechanism of Spark Streaming, Transformations and Output Operations, Spark Streaming data sources and Spark Streaming sinks are discussed. This article will continue the previous content, mainly including the following contents:

  • Stateful calculation
  • Time based window operation
  • Persistence
  • Checkpoint
  • Use dataframes & SQL to process stream data

Stateful calculation

updateStateByKey

The previous article introduced common stateless conversion operations. For example, in the example of WordCount, the output result is only related to the data of the current batch interval and will not depend on the calculation result of the previous batch interval. spark Streaming also provides a stateful operation: updateStateByKey, which will maintain a state and update information at the same time. This operation will read the calculation results of the previous batch interval, and then apply the results to the current batch interval data statistics. Its source code is as follows:

def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)] = ssc.withScope {
    updateStateByKey(updateFunc, defaultPartitioner())
  }

This operator can only be used on the DStream of the key – value pair. It needs to receive a status update function updateffunc as a parameter. Use cases are as follows:

object StateWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName(StateWordCount.getClass.getSimpleName)
    val ssc = new StreamingContext(conf, Seconds(5))
    // checkpoint must be enabled, otherwise an error will be reported
    ssc.checkpoint("file:///e:/checkpoint")
    val lines = ssc.socketTextStream("localhost", 9999)

    // Status update function
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {

      var oldvalue = stateValue.getOrElse(0) // Get status value
      // Traverse the current data and update the status
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // Return to the latest status
      Option(oldvalue)
    }

    val count = lines.flatMap(_.split(" "))
      .map(w => (w, 1))
      .updateStateByKey(updateFunc)
    count.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

Scream tip: the above code must enable checkpoint, otherwise an error will be reported:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()

updateStateByKey disadvantages

If there is no input data of the above source code, the calculation result will be continuously updated even if there is no input data of the above source code.

updateStateByKey can return all previous historical data within the specified batch interval, including new, changed and unchanged. Because the updateStateByKey must be used as a checkpoint, when the amount of data is too large, the checkpoint will occupy a huge amount of data, which will affect the performance and low efficiency.

mapwithState

mapwithState is another stateful operator provided by Spark. This operation overcomes the disadvantage of updateStateByKey and has been introduced since Spark 1.5. The source code is as follows:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
      self,
      spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
  }

mapWithState only returns the value of the changed key. For keys that have not changed, it does not return. In this way, you can only care about the changed keys. If there is no data input, the data of the unchanged keys will not be returned. In this way, even if there is a large amount of data, checkpin will not occupy too much storage as updateBykey, which is more efficient (recommended in production environment).

object StatefulNetworkWordCount {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAppName("StatefulNetworkWordCount")
      .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("file:///e:/checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    /**
      * word: Current key value
      * one: Value value corresponding to current key
      * state: Status value
      */
    val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      println(s">>> batchTime = $batchTime")
      println(s">>> word      = $word")
      println(s">>> one     = $one")
      println(s">>> state     = $state")
      val output = (word, sum)
      state.update(sum) //Update the status value of the current key
      Some(output) //Return results
    }
    // Pass StateSpec Function build StateSpec
    val spec = StateSpec.function(mappingFunc)
    val stateDstream = wordDstream.mapWithState(spec)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Time based window operation

Spark Streaming provides two types of window operations: scrolling window and sliding window. The specific analysis is as follows:

Scrolling windows

The schematic diagram of the rolling window is as follows: the rolling window only needs to pass in a fixed time interval, and there is no overlap in the rolling window.

The source code is as follows:

/**
   * @param windowDuration:The length of the window; Must be an integer multiple of batch interval
   */
  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

Sliding windows

The schematic diagram of the sliding window is as follows: the sliding window only needs to pass in two parameters, one is the length of the window and the other is the sliding time interval. It can be seen that the sliding windows overlap.

The source code is as follows:

/**
   * @param windowDuration Window length; Must be an integer multiple of batching interval
   *                       
   * @param slideDuration  Sliding interval; Must be an integer multiple of batching interval
   */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
  }

Window operation

  • window(windowLength, slideInterval)

    • explain

      Based on the windowed batch data generated by the source dstream, a new dstream is calculated

    • Source code

        def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
        def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
          new WindowedDStream(this, windowDuration, slideDuration)
        }
  • countByWindow(windowLength, slideInterval)

    • explain

    Returns the number of elements of a sliding window

    • Source code

      /**
         * @param windowDuration window Length, must be batch interval Multiple of 
         * @param slideDuration  The sliding interval must be batch interval Multiple of
   */
  def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }
```


  • reduceByWindow(func, windowLength, slideInterval)

    • explain

    Returns a stream of cells. Use the func function to aggregate the elements of the flow of the sliding time interval to create this cell flow. The function func must satisfy the association law, so that it can support parallel computing

    • Source code

        def reduceByWindow(
            reduceFunc: (T, T) => T,
            windowDuration: Duration,
            slideDuration: Duration
          ): DStream[T] = ssc.withScope {
          this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
        }
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

    • explain

    When applied to a DStream composed of (K,V) key value pairs, a new DStream composed of (K,V) key value pairs will be returned. The value of each key is aggregated and calculated by the given reduce function (func function). Note: by default, this operator uses Spark's default number of concurrent tasks to group. The number of tasks can be specified through different task parameters

    • Source code

        def reduceByKeyAndWindow(
            reduceFunc: (V, V) => V,
            windowDuration: Duration,
            slideDuration: Duration
          ): DStream[(K, V)] = ssc.withScope {
          reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
        }
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

    • explain

    For more efficient reduceByKeyAndWindow, the reduce value of each window is calculated incrementally based on the reduce value of the previous window; It will reduce the new data entering the sliding window and reverse reduce the old data leaving the window. However, it can only be used for reversible reduce functions, that is, those reduce functions have a corresponding reverse reduce function (passed in with InvFunc parameter). Note: checkpointing must be enabled

    • Source code

      def reduceByKeyAndWindow(
            reduceFunc: (V, V) => V,
            invReduceFunc: (V, V) => V,
            windowDuration: Duration,
            slideDuration: Duration,
            partitioner: Partitioner,
            filterFunc: ((K, V)) => Boolean
          ): DStream[(K, V)] = ssc.withScope {
      
          val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
          val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
          val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
          new ReducedWindowedDStream[K, V](
            self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
            windowDuration, slideDuration, partitioner
          )
        }
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])

    • explain

      When applied to a DStream composed of (K,V) key value pairs, a new DStream composed of (K,V) key value pairs is returned. The corresponding value value of each key is their frequency in the sliding window

    • Source code

      def countByValueAndWindow(
            windowDuration: Duration,
            slideDuration: Duration,
            numPartitions: Int = ssc.sc.defaultParallelism)
            (implicit ord: Ordering[T] = null)
            : DStream[(T, Long)] = ssc.withScope {
          this.map((_, 1L)).reduceByKeyAndWindow(
            (x: Long, y: Long) => x + y,
            (x: Long, y: Long) => x - y,
            windowDuration,
            slideDuration,
            numPartitions,
            (x: (T, Long)) => x._2 != 0L
          )
        }

Use case

val lines = ssc.socketTextStream("localhost", 9999)

    val count = lines.flatMap(_.split(" "))
      .map(w => (w, 1))
      .reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))
      .print()
//scroll window

/*    lines.window(Seconds(20))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()*/

Persistence

Persistence is a way to improve the performance of Spark applications Part II Spark core programming guide This article explains the use of RDD persistence. In fact, DStream also supports persistence. It also uses persist() and cache() methods. Persistence is usually used in stateful operators, such as window operations. By default, although the persistence method is not explicitly called, the underlying layer has done persistence operations for users, as can be seen from the following source code.

private[streaming]
class WindowedDStream[T: ClassTag](
    parent: DStream[T],
    _windowDuration: Duration,
    _slideDuration: Duration)
  extends DStream[T](parent.ssc) {
  // Omit code
  // Persist parent level by default, as those RDDs are going to be obviously reused.
  parent.persist(StorageLevel.MEMORY_ONLY_SER)
}

Note: different from the persistence of RDD, the default persistence level of DStream serializes data in memory. It can be seen from the following source code:

/** Given a planning level */
  def persist(level: StorageLevel): DStream[T] = {
    if (this.isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of a DStream after streaming context has started")
    }
    this.storageLevel = level
    this
  }

  /** The default persistence level is (MEMORY_ONLY_SER) */
  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
  def cache(): DStream[T] = persist()

From the above source code, we can see that the main differences between persist() and cache() are:

  • The underlying cache() method calls the persist() method
  • The persist() method has two overloaded methods

    • Nonparametric persist() defaults to memory
    • Persistence (level: storagelevel), you can choose the same persistence level as RDD persistence

Checkpoint

brief introduction

Streaming applications usually run 24 / 7, so they must have elastic fault tolerance for failures unrelated to application logic (such as system failure, JVM crash, etc.). Therefore, Spark Streaming needs to checkpoint enough information to a fault-tolerant storage system (such as HDFS) so that it can recover from failure. There are two types of checkpoints:

  • Metadata checkpoint

    Metadata checkpoints ensure recovery from Driver program failures. That is, if the node running drive fails, you can view the latest checkpoint data to obtain the latest status. Typical application metadata includes:

    • Configuration: the configuration used to create a streaming application.
    • DStream operation: defines the DStream operation of the streaming application.
    • Unfinished batch: the job s corresponding to the currently running batch are queued in the queue, and the data of the batch has not been calculated.
  • Data checkpoint

    Save the generated RDD to reliable storage. In some stateful transitions, you need to merge data from multiple batches, so you need to open checkpoints. In this type of transformation, the generated RDD depends on the RDD of the previous batch, which leads to the length of the dependency chain increasing over time. In order to avoid the unlimited increase of recovery time (proportional to the dependency chain), the intermediate RDD with state transition regularly checkpoints to reliable storage (such as HDFS) to cut off the dependency chain. The function is similar to persistence. It only needs to recover from the current state without recalculating the whole lineage.

In a word, metadata checkpoints are mainly needed to recover from Driver program failures. If stateful transitions are used, data or RDD checkpoints are required.

When to enable checkpoints

Checkpoints must be enabled for applications with the following types:

  • Stateful transition operation is used

    If you use updateStateByKey or reduceByKeyAndWindow in your application, you must provide a checkpoint directory to allow regular RDD checkpoints.

  • Recover from a Driver failure running the application

    Metadata checkpoints are used to recover progress information.

Note that simple streaming applications without the aforementioned state transitions can run without checkpointing enabled. In this case, recovery from driver failure will also be partial (some lost but unprocessed data may be lost). This is generally acceptable, and many run Spark Streaming applications in this way. Support for non Hadoop environments is expected to improve in the future.

How to configure checkpoints

Checkpointing can be enabled by setting a directory in a fault-tolerant and reliable file system (such as HDFS, S3, etc.), and the checkpoint information can be saved in the directory. To enable checkpoints, you need to enable the following two configurations:

  • streamingContext. Checkpoint (< dir >): configure the directory of checkpoints, such as HDFS path
  • dstream. Checkpoint (< duration >): the frequency of checkpoints

The time interval for configuring checkpoints is optional. If not set, a default value will be selected according to the type of DStream. For MapWithStateDStream, the default checkpoint interval is 10 times the batch interval. For other dstreams, the default checkpoint interval is 10S or the interval of batch interval. It should be noted that the frequency of checkpoint must be an integer multiple of batch interval, otherwise an error will be reported.

In addition, if you want to recover an application from a Driver failure, you need to create a StreamingContext in the following way:

def createStreamingContext (conf: SparkConf,checkpointPath: String):
StreamingContext = {
val ssc = new StreamingContext( <ConfInfo> )
// .... other code ...
ssc.checkPoint(checkpointDirectory)
ssc
}
#Create a new StreamingContext or get it from the nearest checkpoint
val context = StreamingContext.getOrCreate(checkpointDirectory,
createStreamingContext _)
#start-up
context.start()
context.awaitTermination()
  • When the program starts for the first time, it will create a new StreamingContext and then call start ().
  • When the program restarts after a failure, it recreates the StreamingContext based on the checkpoint data in the checkpoint directory.

be careful:

The checkpoint of RDD needs to save the data to reliable storage, which brings some costs. This may result in increased processing time for those batches where the RDD obtains checkpoints. Therefore, it is necessary to set a reasonable interval of checkpoints. When the batch interval is small (for example, 1 second), checkpointing every batch interval may greatly reduce the throughput. On the contrary, too long checkpoint interval will lead to an increase in lineage and task size, which may have an adverse impact. For stateful transitions requiring RDD checkpoints, the default interval is a multiple of batch interval, which should be at least 10 seconds. You can use DStream Checkpoint (checkpoint interval) is configured. Generally, the checkpoint interval of 5-10 batch intervals of DStream is a better choice.

The difference between checkpointing and persistence

  • Persistence

    • When we keep RDD on disk_ At the only storage level, the RDD will be stored in one location, and the lineage will not be recalculated for subsequent use of the RDD.
    • After calling persist (), Spark will remember the lineage of RDD, even if it does not call it.
    • After the job runs, the cache is cleared and the file is destroyed.
  • Checkpoint

    • Checkpoints store RDD S in HDFS, which will delete lineage kinship.
    • After completing the job run, unlike the schedule, the checkpoint file will not be deleted.
    • When the checkpoint an RDD, it will result in double calculation. That is, before completing the actual calculation, the operation will first call the persistence method, and then write it to the checkpoint directory.

Use dataframes & SQL to process stream data

In Spark Streaming applications, you can easily stream data using DataFrames and SQL operations. Use cases are as follows:

object SqlStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(SqlStreaming.getClass.getSimpleName)
      .setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))

    words.foreachRDD { rdd =>
      // Call the SparkSession singleton method. If it has been created, it will be returned directly
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      val wordsDataFrame = rdd.toDF("word")
      wordsDataFrame.show()

      wordsDataFrame.createOrReplaceTempView("words")

      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()

    }


    ssc.start()
    ssc.awaitTermination()
  }
}
/** SparkSession Single case */
object SparkSessionSingleton {

  @transient private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

summary

This article is the second share of Spark Streaming programming guide, mainly including stateful calculation, time-based window operation, checkpoint and so on. The next article will share Spark MLLib machine learning.

Follow official account Big data technology and data warehouse , keep abreast of the latest developments

Tags: Spark

Posted by predhtz on Tue, 24 May 2022 22:24:57 +0300