The only way for data development - data tilt

foreword

Data skew is the most common problem in data development, and it is also a question that must be asked in interviews. So why is the data skewed? When will data skew occur? and how to solve it?

What is data skew: The essence of data skew is uneven data distribution. Some tasks process a large amount of data, which leads to a longer execution time for the overall job.

When does data skew occur: Whether it is spark or mapreduce, most of the data skew occurs in the shuffle stage, which is the so-called shuffling. Due to the different shuffling strategies used, the data division is also different. It is commonly used. That is the hash algorithm.

Based on the answers to the above two questions, the essence of the solution to data skew is how to distribute the data evenly.

The author believes that according to the optimization strategy, it can be divided into business-level optimization and technical-level optimization.

First of all, the optimization at the business level is to optimize the actual business scenarios and data characteristics, while the optimization at the technical level is essentially the optimization of the two major components of storage and computing, and then according to different technologies (hive,spark) use different parameters or function method.

These two strategies will be explained in detail in the following.

Technical level optimization

Hive

What is discussed here is that mapreduce is still used as the underlying engine, and the mode of hive on tez is not explained. Most of the optimization ideas and parameters are the same.

1. Parameter optimization

Here are some parameter configurations related to data skew, which generally can only play a role in mitigating, but cannot completely solve the skew problem. Some of the optimization parameters have not been involved for the time being (such as map side, reduce side, jvm, compression, etc. have optimization points)

parameter parameter value describe
hive.map.aggr true map-side aggregation, equivalent to Combiner, the main idea is to reduce the amount of data distributed to reduce
hive.groupby.skewindata true When set to true, two mr job s will be generated. In the first job, the results output by the Map side will be randomly distributed to the reduce, each reduce will do partial aggregation, and return the result, the purpose is to make the same key possible Distributed to different reducers for load balancing. The second mr job is distributed to the reducers according to the key according to the preprocessing results (the purpose of this step is to ensure that the same key will eventually be assigned to the same reducer)
hive.auto.convert.join true Whether to convert common join (reduce side join) into map join
hive.mapjoin.smalltable.filesize 25000000 The input file size threshold that is judged to be a small table, the default is 25M
hive.groupby.mapaggr.checkinterval 100000 The number of entries for aggregation operations on the Map side
hive.mapjoin.cache.numrows 25000 How many rows of data to cache the build table to memory
hive.optimize.skewjoin true When this option is enabled, Hive will temporarily write the row corresponding to the skew key whose count exceeds the threshold hive.skewjoin.key (default 100000) into the file during the join process, and then start another job to do map join to generate the result
hive.merge.mapfiles true Merge small files to reduce the number of corresponding map s
hive.skewjoin.key 100000 The threshold for judging data skew, if the same key is found to exceed this value in the join, it is considered that the key is a skew key
2.Sql optimization

The reason for the appearance of data skew is data characteristics on the one hand, and artificial causes on the other hand, which is caused by rough sql development (the main part). Here are a few common sql tilt scenarios and solutions

2.1 Join optimization

hive is divided according to the join key and shuffle occurs, so the selected keys are distributed as evenly as possible. The current scenarios are nothing more than the association between large tables and small tables, the association between small tables and small tables (generally there is no skew), and the association between large tables and large tables. Here are the explanations for these situations.

2.1.1 Large table join small table - MapJoin

The size table here is relative. If a table A has 100 million data and a table B has 10 million, then table B is a small table. When the key distribution in the B table is relatively concentrated, when the shuffle is performed, the amount of data on one or several reduce rs will be higher than the average, which is more prone to skew. For this scenario, it is generally solved by mapjoin.

The principle of MapJoin is to load all the small tables into the memory (note that if the memory cannot store 10 million data, the memory needs to be adjusted), and join on the map side, so that there will be no shuffle phase.

--original sql
select 
  lnc.request_url,
  count(uuid) as pv
from wedw_dwd.log_ng_channel lnc
join 
(
  select 
     request_url,
     visit_time,
     uuid
  from wedw_dwd.track_beacon_log
)t
on lnc.request_url = t.request_url
group by lnc.request_url

--mapjoin 
select /*+ MAPJOIN(lnc) */  
  lnc.request_url,
  count(uuid) as pv
from wedw_dwd.log_ng_channel lnc
join 
(
  select 
     request_url,
     visit_time,
     uuid
  from wedw_dwd.track_beacon_log
)t
on lnc.request_url = t.request_url
group by lnc.request_url
2.1.2 Large table join Large table - Skewjoin

When both tables are very large and cannot be directly loaded into memory, then it is necessary to evaluate whether the distribution of join key s is uniform.

Case 1: When the key distribution is uniform, then this time is generally not the category of tilt, and other tuning methods such as increasing the number of reduce need to be considered.

Case 2: When the key distribution is uneven, if only a few keys have a large amount of data, then these keys need to be taken out for calculation; if most of the keys have a large amount of data, then this time need to The way to add random prefixes is the idea of ​​secondary aggregation.

--Parameter adjustment
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=100000;
set hive.optimize.skewjoin.compiletime=true;

-- a few key The amount of data is relatively large,need to be calculated separately
select 
  lnc.request_url,
  count(uuid) as pv
from wedw_dwd.log_ng_channel lnc
join 
(
  select 
     request_url,
     visit_time,
     uuid
  from wedw_dwd.track_beacon_log
  where request_url!='www.baidu.com'
)t
on lnc.request_url = t.request_url
group by lnc.request_url

union all 

select 
  lnc.request_url,
  count(uuid) as pv
from wedw_dwd.log_ng_channel lnc
join 
(
  select 
     request_url,
     visit_time,
     uuid
  from wedw_dwd.track_beacon_log
  where request_url='www.baidu.com'
)t
on lnc.request_url = t.request_url
group by lnc.request_url

--most key The amount of data is relatively large, and the random prefix is ​​used.,The amount of data in the right table also needs to be expanded
select 
  split(request_url,'&')[1] as request_url,
  sum(cnt) as cnt
from 
(
  select 
    t1.request_url,
    count(uuid) as cnt
  from 
  (
    select 
       concat(cast(round(rand()*10) as int),'&',request_url) as request_url
    from wedw_dwd.log_ng_channel
  )t1
  left join 
  (  -- Expand 10 times
    select 
      concat('1&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('2&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('3&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('4&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('5&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('6&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('7&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('8&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('9&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
    union all 
    select 
      concat('10&',request_url) as request_url,
      uuid
    from wedw_dwd.track_beacon_log
  )t2
  on t1.request_url = t2.request_url
  group by t1.request_url
)t
group by split(request_url,'&')[1]
2.2 distinct optimization

The distinct operation also goes through the shuffle phase, which is usually used in combination with group by, and is also a high-frequency operation for data skew. Usually, for operations that require distinct, we can solve it in a different way, that is, perform group by first and then perform subsequent operations. Examples are as follows:

--original sql
select 
   request_url,
   count(distinct uuid)
from wedw_dwd.log_ng_channel
group by request_url

--above sql can be rewritten as
select 
  request_url,
  sum(1) as cnt
from 
(
  select 
  request_url
  ,uuid
from wedw_dwd.log_ng_channel
group by request_url,uuid
)t
group by request_url
2.3 Filter/Split

Filtering: Usually, when statistics are performed, there will always be a lot of dirty data or empty data in the table. When the actual demand does not care about these dirty data or empty data, then we can filter first, and then perform subsequent operations. Avoid data skew by reducing data volume

Splitting: This is very similar to the SkewJoin optimization idea described above.

Situation 1: For example, there are many NULL values ​​in the table, which account for the highest proportion in the entire key distribution, but the actual needs cannot filter these null values, so these null values ​​need to be calculated separately, or filled with random numbers

Case 2: For example, when most of the keys in the table account for a relatively high proportion, then it is necessary to add random prefixes to these keys to make the reduce distribution evenly distributed.

--filter
select 
  request_url,
  count(1) as cnt
from wedw_dwd.log_ng_channel
where request_url is not null and length(request_url)>0 and to_date(visit_time)>='2020-10-01'
group by request_url

--split
select 
  request_url,
  count(1) as cnt
from wedw_dwd.log_ng_channel
where request_url is not null and length(request_url)>0
group by request_url
union all 
select 
  request_url,
  count(1) as cnt
from wedw_dwd.log_ng_channel
where request_url is  null 
group by request_url

Spark

For spark, when one or several task s take a long time to process and process a large amount of data, then it is a problem of skew. For spark's data skew, the solution is the same as Hive, but Hive usually uses sql as the solution. Main, and Spark is an operation on rdd, so the optimization details are still somewhat different. In fact, whether it is spark or hive, the problem of data skew is nothing more than data characteristics (the distribution itself is uneven / the amount of data itself is relatively large) or the subsequent artificial development and writing logic.

1. Check the data source

In spark, the division of stages is bounded by the shuffle operator. Different partition s of the same Stage can be processed in parallel, and different stages can only be processed serially. The overall time consumption of a Stage is determined by the slowest task. For different tasks in the same Stage, the processing time is determined by the amount of data processed by each task, excluding differences in computing power. , and Stage's data sources are mainly divided into two categories:

  1. Get the shuffle data of the previous stage
  2. Direct connection to data sources, such as kafka,hdfs,local filesystem

If you connect to kafka, you need to combine kafka monitoring to check whether the partition data is evenly distributed. If a certain partition has more messages than other partitions, then the partition allocation strategy needs to be adjusted at this time; or the number of partitions is relatively small, you need to Increase the number of partition s;

If it is connected to hdfs and inseparable files, each file corresponds to a partition. At this time, it depends on whether the data volume of each file is relatively uniform (note that you cannot only look at the file size here, if it is a compressed file, you need to look at the amount of data) .

For hdfs splittable files, the size of each split is determined by the following algorithm. where goalSize is equal to the total size of all files divided by minPartitions. The blockSize, if it is an HDFS file, is determined by the block size of the file itself; if it is a Linux local file and the local mode is used, it is determined by fs.local.block.size.

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}

In general, the size of each split is equivalent to the size of a block, and there is usually no skew. If there is, you can adjust the parameters for.

Next, troubleshoot and locate the phenomenon that includes the shuffle stage.

Summary: Use divisible files as much as possible, increase parallelism at the source, and avoid skew

2. Locate the code that causes the tilt

Check the data source distribution according to the first step. If it is uniform, then the skew may be caused by human development. At this time, it is necessary to locate the data skew caused by the specific code (generally, it is to find the operator that will shuffle, distinct,groupByKey,join,repartition,reduceByKey,cogroup and other operators)

2.1. View the task running time and the amount of data processed

Here you can view the processing time and the amount of data processed for each task through the spark web ui interface

2.2. Infer oblique code

Based on the spark web ui to see the processing status of each task, you can see which Stage the task is in, and then look for the shuffle operator in the code to locate the specific code that caused the data skew . Here is an example of wordcount to briefly illustrate

val conf = new SparkConf()
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://project/log/test/word.txt")
val words = lines.flatMap(_.split(","))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.collect().foreach(println(_))

From the above code, you can see that only reduceByKey will go through the shuffle phase, so only here will there be a possibility of skew

3. Solve the tilt

Looking at various materials, 8 solutions are listed on the Internet. The author has made a classification and summary here, and tried to integrate each solution.

3.1 Adjusting the degree of parallelism

Spark Shuffle uses HashPartitioner by default for data partitioning. When executing shuffle read, the number of read task s is determined according to the spark.sql.shuffle.partitions parameter. The default value is 200. When a large number of different key s are assigned to the same task , it may cause the data processed by this task to be much larger than other tasks.

Therefore, the essence of adjusting the degree of parallelism is to distribute different keys originally assigned to the same Task to different tasks for processing, which can reduce the amount of data that the original Task needs to process and play a role in alleviating the skew. The specific adjustment method can specify the degree of parallelism through the above parameters, or specify the parameter value when using the shuffle operator.

Note: This optimization method is not suitable for scenarios with a large amount of data for the same key, and can only play a role in alleviating the tilt

Such as:

SparkSession sparkSession = SparkSession.builder()
                .appName("wordcount")
                .master("local")
                .getOrCreate();

        JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        JavaRDD<String> javaRDD = javaSparkContext.textFile("hdfs://bigdatatest-1:8020/tmp/test.txt");
        javaRDD.mapToPair(x -> new Tuple2<>(x.toLowerCase() , 1))
                .reduceByKey((a, b) -> a + b,1000) /**The 1000 here is the setting of the parallelism partition*/
                .collect()
                .forEach(new Consumer<Tuple2<String, Integer>>() {
                    @Override
                    public void accept(Tuple2<String, Integer> stringIntegerTuple2) {
                        System.out.println("The Key:"+stringIntegerTuple2._1+" Value:"+stringIntegerTuple2._2.toString());
                    }

3.2 Custom partitioner

According to the above mentioned, spark uses hash partition by default. Sometimes adjusting the parallelism still cannot effectively solve the problem of data skew, so at this time, it is necessary to customize the partition according to the actual data characteristics. The main purpose is to try to use different key s as much as possible. Assigned to different task s, this method is not suitable for scenarios where the same key has a large amount of data

/**
*custom partition
*/
public class DefinePartition extends Partitioner {
    public  DefinePartition(){}

    @Override
    public int hashCode() {
        return super.hashCode();
    }

    @Override
    public boolean equals(Object definePartition) {
        DefinePartition definePartition1 = (DefinePartition) definePartition;
        return this.numPartitions()==((DefinePartition) definePartition).numPartitions();
    }

    @Override
    public int numPartitions() {
        return 20;
    }

    @Override
    public int getPartition(Object key) {
        int Code  = 0;
        try {
            String host = new URL(key.toString()).getHost();
            Code = host.hashCode()%numPartitions();
            if(Code<0){
                Code+=numPartitions();
            }
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }

        return Code;
    }
}
3.3 Filtering data

There are two ways to filter data here:

  1. Filter useless data: The useless data here refers to the data for specific business demand scenarios, such as empty data, which has no effect on this demand. Therefore, the filter operator can be called before shuffle to filter out
  2. Filter out a small number of keys with a large amount of data: The filtering mentioned here is not really filtering out, but by sampling to count which keys occupy a large amount of data, and extract them in advance for separate calculation. The processing idea is the same as that of hive

Note: This method is only valid for a small number of key s that are skewed

3.4 Avoid using shuffle operators

I believe that most readers know that the same key on multiple nodes will be pulled to the same node for calculation during the shuffle process. At this time, disk io and network transmission will be involved, which is why shuffle is inefficient. the reason.

Therefore, in actual development, try to avoid using shuffle operators. For example, instead of using the join operator, broadcast+map is used to implement it.

 List<Map<String, String>> collect = javaSparkContext.textFile("hdfs://bigdatatest-1:8020/tmp/test.txt")
                .mapPartitions(new FlatMapFunction<Iterator<String>, Map<String, String>>() {
                    @Override
                    public Iterator<Map<String, String>> call(Iterator<String> stringIterator) throws Exception {
                        List<Map<String, String>> list = new ArrayList<>();
                        HashMap<String, String> hashMap = Maps.newHashMap();
                        while (stringIterator.hasNext()) {
                            String str = stringIterator.next();
                            hashMap.put(str.split(",")[0], str.split(",")[1]);
                        }
                        list.add(hashMap);
                        return list.iterator();
                    }
                }).collect();

Broadcast<List<Map<String, String>>> listBroadcast = javaSparkContext.broadcast(collect);
javaRDD.map(new Function<String, String>() {
  @Override
  public String call(String s) throws Exception {
    Iterator<Map<String, String>> iterator = listBroadcast.getValue().iterator();
    while (iterator.hasNext()) {
      Map<String, String> stringMap = iterator.next();
      if (stringMap.containsKey(s)) {
        return stringMap.get(s);
      }
    }
    return null;
  }
}).collect();
3.5 Salting operation (secondary aggregation/random prefix + expansion)

The author believes that the method of secondary aggregation and the method of random prefix + expansion are essentially all salting operations, that is, adding salt to the key to allocate it to different task s, and then merging to ensure that the same key will eventually be aggregated together. Although the two ideas are the same, the scenarios used are still different.

Two-stage aggregation:

  1. The first time is local aggregation, first assign a random number to each key
  2. Then perform aggregation operations, such as reduceByKey,groupByKey, the results obtained at this time are definitely much less than the original data set
  3. Then delete the random number on the key to ensure that the same key in the original data set can be assigned to the same task
  4. Perform the aggregation operation again to get the final result

Note: Secondary aggregation is only applicable to the shuffle operation of the aggregation class

Random prefix + expansion

This optimization method can refer to the previous hive solution, and the ideas are the same.

Note: Inconsistent with the secondary aggregation scenario, this is an operation for join type; it is applicable whether it is a few key inclinations or most key inclinations, but the rdd needs to be expanded, and memory resources need to be balanced.

3.6 Various join transformations

First of all, let's briefly understand several join implementations of spark and the scenarios that have been applied.

  1. Broadcast Hash Join: It is suitable for joining a small table and a large table. The principle is to broadcast and distribute one of the small tables to the partition node where the other large table is located, and perform hash join with the partition records on it concurrently. Broadcast is suitable for scenarios where the small table is small and can be broadcast directly. Of course, the table to be broadcast needs to be smaller than the value configured by spark.sql.autoBroadcastJoinThreshold. The default value is 10M. If you increase the value of this parameter, please consider the memory on the driver side to avoid oom, because Generally, the broadcasted table needs to be collect ed to the driver side, and then distributed to the executor side.
  2. Shuffle Hash Join: It is suitable for joining a small table and a large table, or joining between two small tables. The small table mentioned here is larger than the small table in the broadcast hash join scenario, and is not suitable for the broadcast method. The principle of this join is to use the principle that the same key must have the same partition. In both tables, the rows with the same key will be shuffle d into the same partition. **SparkSQL divides and conquers the join of the larger table, and divides the table first. It is divided into n partitions, and then Hash Join is performed on the data of the corresponding partitions in the two tables, which not only reduces the pressure of the driver broadcast side table to a certain extent, but also reduces the memory of the executor side to fetch the entire broadcast table. consume
  3. Sort Merge Join: Suitable for joining between two larger tables. The principle is to first shuffle the two tables according to the join key to ensure that the records with the same join key value will be divided into corresponding partitions. After partitioning, the data in each partition is sorted, and then the corresponding partitions are sorted. The records are connected, because the two sequences are ordered, traverse from the beginning, and output if the key is the same; if it is different, continue to take the left if the left is small, otherwise take the right. The principle is that when you use it, you can throw it away.

At present, most of the information on the Internet is a solution for converting reduce join to map join. The principle is the first join method above, that is, the broadcast+map method. The author believes that the second join method is the corresponding salt added above. operate. Based on the brief description of the above three join methods, readers can increase the parameter spark.sql.autoBroadcastJoinThreshold according to the actual memory resources and bandwidth resources, so that more joins are actually executed as broadcast hash join s.

business-level optimization

Through the above explanation of the inclined solution at the technical level, in fact, the relevant optimization at the business level has been given.

Filter useless data according to actual business requirements, and try to avoid the occurrence of shuffle.

Reverse statistics: For example, the demand side wants to count the bounce rate of a certain page. The normal logic statistics are: the next page of the page is empty UV/UV of the page, then we can pass 1- (the next page of the page is not Empty UV/UV of this page) to get the bounce rate, the so-called reverse derivation

**Returning the demand:** Of course, if the reader thinks that the demand is unrealistic and the output is of little significance, it can be called back completely, but the optimization given by the author is really the last resort, as a technical Staff, we should still face it and solve the problem completely.

Summarize

If readers can read this, first of all, thank you very much for your patient reading. This article basically gives solutions, and there are few actual cases. Because in the actual scene, it may not be one of the solutions in this article that can completely solve the reader's problem. Sometimes it needs to be solved by combining a variety of other optimization methods and inclined solution ideas. The author believes that mastering the solution idea of ​​a problem is important. If readers are more attentive, they may find that the words that appear more in this article are even, so the antonym of tilt is even, which is the main line of solving data skew, that is, all solutions are developed around balance.

Tags: Big Data hive Spark Data Warehouse

Posted by Paris! on Fri, 13 May 2022 21:01:37 +0300