Programming practice of Spark Streaming+Kafka based on Python

explain

There are many articles explaining the principle of Spark Streaming, which will not be introduced here. This paper mainly introduces the programming model, coding practice and some optimization instructions using Kafka as the data source

spark streaming:http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html

streaming-kafka-integration:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html

Demo environment

  1. Spark:1.6
  2. Kafka:kafka_2.11-0.9.0.1
  3. Implementation language: Python

Programming model

At present, the kafka programming of Spark Streaming mainly includes two models
1. Based on Receiver
2. Direct (without Receiver)

Based on Receiver

In this way, the Receiver is used to receive the data in Kafka, which basically uses the Kafka high-level user API interface. For all receivers, the data received from Kafka will be stored in Spark's executor, and then the job submitted by spark streaming will process these data

schematic diagram

explain

  1. Write Ahead Logs is needed to ensure no data loss. If we enable Write Ahead Logs to be copied to a file system such as HDFS, the storage level needs to be set to storagelevel MEMORY_ AND_ DISK_ Ser, also known as kafkautils createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
  2. In the Receiver mode, the partition in Spark is not related to the partition in kafka. Therefore, if we increase the number of partitions for each topic, we will only increase threads to process the topics consumed by a single Receiver. However, this does not increase the parallelism of Spark in processing data.
  3. For different groups and topic s, we can use multiple receivers to create different dstreams to receive data in parallel, and then use the union to unify them into one Dstream.

Direct (without Receiver)

In spark 1 After 3, the Direct mode is introduced. Unlike the receiver method, the Direct method does not have the receiver layer. It will periodically obtain the latest offsets in each partition of each topic in Kafka, and then process each batch according to the set maxRatePerPartition

Different from the Receiver's method (reading the offset value from zookeeper will naturally save the offset value of the current consumption, and if you restart the consumption, you will continue to consume the last offset value). In the Direct mode, the data is read directly from kafka. Offset needs to be recorded by itself. You can use checkpoint, database or file records or write back to zookeeper for recording

schematic diagram

explain

  1. Simplified parallelism: in the Receiver method, we mentioned that after creating multiple receivers, we use the union to combine them into one Dstream to improve the parallelism of data transmission. In the Direct mode, the partition in Kafka corresponds to the partition in RDD one by one, and the Kafka data is read in parallel. This mapping relationship is also more conducive to understanding and optimization.
  2. High efficiency: in the Receiver mode, in order to achieve zero data loss, the data needs to be stored in the Write Ahead Log. In this way, two copies of data are saved in Kafka and log, which is a waste! The second method does not have this problem. As long as the data retention time of Kafka is long enough, we can recover data from Kafka.
  3. Accurate once: in the Receiver mode, Kafka's high-order API interface is used to obtain the offset value from Zookeeper, which is also the traditional way to read data from Kafka. However, due to the non synchronization between the data consumed by Spark Streaming and the offset recorded in Zookeeper, this mode will occasionally cause repeated data consumption. The second method directly uses the simple low-order Kafka API, and Offsets uses Spark Streaming checkpoints to record, eliminating this inconsistency.

Code practice

Kafka producer

package com.eric.kafka.producer;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Hello world!
 */
public class ProcuderSample {
    private final Producer<String, String> producer;
    public final static String TOPIC = "spark_streaming_test_topic";
    public final static Integer BATCH_SIZE = 2000;

    private ProcuderSample() {
        Properties props = new Properties();
        // The port of kafka is configured here
        props.put("metadata.broker.list", "server1-2-5-24-138:9092,server1-3-5-24-139:9092,server1:9092");
        // Configure the serialization class of value
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // Configure serialization class of key
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "-1");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    public void deadLoopSendMessage(){
        int recordCount=0;
        List<KeyedMessage<String, String>> tmpList=new ArrayList<KeyedMessage<String, String>>();
        while(true){
            Random rand=new Random();
            // Batch send data
//          String randResult=recordCount+":"+rand.nextInt(100);
            String randResult=rand.nextInt(10)+"";
            tmpList.add(new KeyedMessage<String, String>(TOPIC, randResult , randResult));
            if (tmpList.size()%BATCH_SIZE==0){
                producer.send(tmpList);
                tmpList.clear();
            }
//          producer.send(new KeyedMessage<String, String>(TOPIC, randResult , randResult));
            recordCount+=1;
        }
    }



    public static void main(String[] args) {
        new ProcuderSample().deadLoopSendMessage();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

Receive data in Receiver mode

# encoding:utf-8
__author__ = 'eric.sun'

"""Demonstrate how to use Spark Streaming adopt Kafka Streaming realization WordCount
   Execute command:./spark-submit --master spark://server1-1-5-24-137:7077 --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ../examples/kafka_streaming.py > log
   Kafka Data source program: https://github.com/Eric-aihua/practise.git/java_cookbook/cookbook/src/main/java/com/eric/kafka/producer/ProcuderSample.java
"""

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def start():
    sconf=SparkConf()
    # sconf.set('spark.streaming.blockInterval','100')
    sconf.set('spark.cores.max' , 8)
    sc=SparkContext(appName='KafkaWordCount',conf=sconf)
    ssc=StreamingContext(sc,2)

    numStreams = 3
    kafkaStreams = [KafkaUtils.createStream(ssc,"server1-2-5-24-138:2181,server1-3-5-24-139:2181,server1-4-5-24-140:2181","streaming_test_group",{"spark_streaming_test_topic":1}) for _ in range (numStreams)]
    unifiedStream = ssc.union(*kafkaStreams)
    print unifiedStream
    #Statistics the distribution of generated random numbers
    result=unifiedStream.map(lambda x:(x[0],1)).reduceByKey(lambda x, y: x + y)
    result.pprint()
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

if __name__ == '__main__':
    start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

Direct data collection

# encoding:utf-8
__author__ = 'eric.sun'

"""Demonstrate how to use Spark Streaming adopt Kafka Direct Streaming realization WordCount
   Execute command:./spark-submit --master spark://server1-1-5-24-137:7077 --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ../examples/kafka_streaming.py > log
   Kafka Data source program: https://github.com/Eric-aihua/practise.git/java_cookbook/cookbook/src/main/java/com/eric/kafka/producer/ProcuderSample.java

   use Direct Benefits of
   1: according to topic The default number of partitions is to create the corresponding number of partitions rdd Number of partitions
   2: Receiver The way needs to be passed Write AHead Log To ensure that data is not lost, Direct In a way that doesn't require
   3: Primary treatment: use Kafka Simple API Read the data and use checkpoint yes offset Record

   Question:
   be based on Zookeeper of Kafka Monitoring tool cannot obtain offset The value of is needed every time Batch After processing, you can Zookeeper Set the value of

"""

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def start():
    sconf=SparkConf()
    sconf.set('spark.cores.max' , 8)
    sc=SparkContext(appName='KafkaDirectWordCount',conf=sconf)
    ssc=StreamingContext(sc,2)

    brokers="server1-2-5-24-138:9092,server1-3-5-24-139:9092,server1-4-5-24-140:9092"
    topic='spark_streaming_test_topic'
    kafkaStreams = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list": brokers})
    #Statistics the distribution of generated random numbers
    result=kafkaStreams.map(lambda x:(x[0],1)).reduceByKey(lambda x, y: x + y)
    #When printing offset, it can also be written to Zookeeper here
    #You can use transform() instead of foreachRDD() as your
    # first method call in order to access offsets, then call further Spark methods.
    kafkaStreams.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
    result.pprint()
    ssc.start()             # Start the computation
    ssc.awaitTermination()  # Wait for the computation to terminate

offsetRanges = []

def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

def printOffsetRanges(rdd):
    for o in offsetRanges:
        print "%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset,o.untilOffset-o.fromOffset)

if __name__ == '__main__':
    start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

Tuning summary

In the use of Spark streaming+Kafka, when the amount of data is small, the default configuration and use can meet the situation in many cases. However, when the amount of data is large, it needs to be adjusted and optimized, and this adjustment and optimization itself requires different configurations in different scenarios.

  1. Reasonable batch duration: almost all Spark Streaming tuning documents will mention the adjustment of batch processing time. When the StreamingContext is initialized, one parameter is the setting of batch processing time. If this value is set too short, that is, the Job generated by a batchDuration cannot complete processing during this period, it will cause continuous data accumulation and eventually lead to Spark Streaming blocking. Moreover, generally, the setting of batchDuration will not be less than 500ms, because too small will lead to frequent Job submission by SparkStreaming and cause additional burden on the whole streaming. In normal applications, I set it between 1 and 10 seconds according to different application scenarios and hardware configurations. We can adjust the batchDuration by observing the Total Delay according to the visual monitoring interface of SparkStreaming
  2. Reasonable Kafka pull amount (maxrateperpartition is important): for the application scenario of Spark Streaming consuming Kafka data, this configuration is very critical. The configuration parameter is: Spark Streaming. kafka. maxRatePerPartition. This parameter is not online by default, that is, it will directly pull out all the data in Kafka. According to the rate at which the producer writes to Kafka and the speed at which the consumer processes data, this parameter needs to be combined with the above batchDuration, so that the data pulled by each partition during each batchDuration can be processed smoothly and the throughput can be as high as possible. For the adjustment of this parameter, please refer to the Input Rate and Processing Time in the visual monitoring interface
  3. Cache the repeatedly used Dstream (RDD): if the RDD in Spark and the Dstream in SparkStreaming are used repeatedly, it is best to use cache to cache the data stream to prevent the network overhead caused by excessive scheduling resources. You can refer to the observation Scheduling Delay parameter
  4. Set up a reasonable GC: as everyone who has been using java for a long time knows, the garbage collection mechanism in the JVM can let us pay less attention to the allocation and recycling of memory and pay more attention to business logic, and the JVM will handle it for us. Those who know something about the JVM should know that in the Java virtual machine, memory is divided into early generation, young generation, old generation and permanent generation. Each GC takes a certain amount of time, especially the GC recycling in the old age. Memory fragments need to be sorted out, and the marking clear method is usually adopted. Similarly, in the Spark program, the frequency and time of JVMGC are also the key factors affecting the efficiency of the whole Spark. In normal use, it is suggested that: - conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
  5. Set a reasonable number of CPU resources: the number of CPU cores. Each executor can occupy one or more cores. You can understand the usage of computing resources by observing the change of CPU utilization. For example, a common waste is that an executor occupies multiple cores, but the total CPU utilization is not high (because an executor can not always make full use of the capacity of multiple cores), At this time, we can consider making an executor occupy less core, and adding more executors under the worker, or adding more workers on a host to increase the number of executors executed in parallel, so as to increase CPU utilization. However, the memory consumption needs to be considered when adding executors, because the more executors a machine's memory is allocated to, the smaller the memory of each executor, resulting in excessive data spin over or even out of memory
  6. Set reasonable parallelism: partition and parallelism. Partition refers to the number of data slices. Each task can only process the data of one partition. If this value is too small, the amount of data in each slice will be too large, resulting in memory pressure, or the computing power of many executor s cannot be fully utilized; However, if it is too large, it will lead to too many fragments and reduce the execution efficiency. When performing action type operations (such as various reduce operations), the largest parent RDD will be selected for the number of partitions. Parallelism refers to the number of partitions of the returned data by default when the RDD performs the reduce operation (while in the map operation, the number of partitions is usually taken from the larger one of the parent RDD, and shuffle is not involved, so the parameter of parallelism has no effect). Therefore, these two concepts are closely related. They both involve data fragmentation, and the mode of action is actually unified. Through Spark default. Parallelism can set the default number of partitions, and many RDD operations can specify a partition parameter to explicitly control the specific number of partitions. In the use of SparkStreaming+kafka, we adopt the Direct connection mode. As explained earlier, the partition in Spark corresponds to the partition in Kafka one by one. We generally default to the number of partitions in Kafka.
  7. Use high-performance operators: (1) replace groupbyke with reduceByKey/aggregateByKey (2) replace normal map with mapPartitions (3) replace foreach with foreachPartitions (4) perform coalesce operation after filter (5) replace repartition and sort operations with repartitionAndSortWithinPartitions
  8. Optimize performance using Kryo serialization
    There are three main areas related to serialization
    • When an external variable is used in the operator function, the variable will be serialized for network transmission (see the explanation in "principle 7: Broadcast large variables").
    • When a custom type is used as a generic type of RDD (for example, JavaRDD, Student is a custom type), all custom type objects will be serialized. Therefore, in this case, it is also required that the customized class must implement the Serializable interface.
    • When using serializable persistence strategy (such as MEMORY_ONLY_SER), Spark will serialize each partition in RDD into a large byte array.

For these three places where serialization occurs, we can use Kryo serialization class library to optimize the performance of serialization and deserialization. Spark uses Java's serialization mechanism by default, that is, ObjectOutputStream/ObjectInputStream API for serialization and deserialization. However, spark also supports the use of Kryo serialization library. The performance of Kryo serialization library is much higher than that of Java serialization library. According to the official introduction, the performance of Kryo serialization mechanism is about 10 times higher than that of Java serialization mechanism. Spark does not use Kryo as the serialization class library by default because Kryo requires that it is best to register all user-defined types that need serialization. Therefore, this method is troublesome for developers.  
The following is a code example using Kryo. We just need to set the serialization class and register the user-defined type to be serialized (such as the external variable type used in the operator function, the user-defined type as the RDD generic type, etc.):

Tags: Python kafka Spark

Posted by Adam_28 on Mon, 16 May 2022 21:59:57 +0300