Analysis of five JOIN strategies of Spark

JOIN operation is a very common data processing operation. As a unified big data processing engine, Spark provides a very rich JOIN scenario. This article will introduce the five JOIN strategies provided by Spark, hoping to help you. This paper mainly includes the following contents:

  • Factors affecting JOIN operation
  • Five strategies implemented by JOIN in Spark
  • How does Spark choose the JOIN strategy

Factors affecting JOIN operation

Size of data set

The size of the data set participating in the join will directly affect the execution efficiency of the join operation. Similarly, it will also affect the selection of join mechanism and the execution efficiency of join.

Conditions of JOIN

JOIN conditions involve logical comparisons between fields. According to the conditions of JOIN, JOIN can be divided into two categories: equivalent connection and non equivalent connection. Equivalent connection involves one or more equality conditions that need to be satisfied at the same time. Apply each equivalence condition between the attributes of two input datasets. When other operators are used (the operation connector is not =), it is called non equivalent connection.

Type of JOIN

After applying JOIN conditions between records in the input dataset, the JOIN type affects the result of the JOIN operation. There are mainly the following types of JOIN:

  • Inner join: only records matching the connection conditions are output from the input dataset.
  • Outer join: it is also divided into left outer join, right outer join and all outer join.
  • Semi join: the right table is only used to filter the data of the left table and does not appear in the result set.
  • Cross join: cross join returns all rows in the left table, and each row in the left table is combined with all rows in the right table. Cross joins are also called Cartesian products.

Five strategies implemented by JOIN in Spark

Spark provides five JOIN mechanisms to perform specific JOIN operations. The five JOIN mechanisms are as follows:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Shuffle Hash Join

brief introduction

When the amount of data in the table to be joined is large, you can choose Shuffle Hash Join. In this way, the large table can be repartitioned according to the JOIN key to ensure that each same JOIN key is sent to the same partition. As shown below:

As shown in the figure above, the basic steps of Shuffle Hash Join mainly include the following two points:

  • First, the two tables participating in the join are repartitioned according to the join key. This process involves Shuffle. Its purpose is to send the data of the same join key to the same partition to facilitate the join in the partition.
  • Secondly, for the partition after each Shuffle, the partition data of the small table will be constructed into a Hash table, and then matched with the partition data record of the large table according to the join key.

Conditions and characteristics

  • Only equivalent connection is supported, and the join key does not need sorting
  • All join types except full outer joins are supported
  • It is a memory intensive operation to build a Hash map for a small table. If the data on one side of the Hash table is large, it may cause OOM
  • Set the parameter spark sql. join. Prefersortmergejoin (default true) is set to false

Broadcast Hash Join

brief introduction

It is also called Map side JOIN. When a table is small, we usually choose Broadcast Hash Join, which can avoid the overhead caused by Shuffle and improve performance. For example, when joining a fact table and a dimension table, because the data of the dimension table is usually very small, you can use Broadcast Hash Join to Broadcast the dimension table. In this way, the Shuffle of data can be avoided (the Shuffle operation in Spark is very time-consuming), so as to improve the efficiency of JOIN. Before the Broadcast Join, Spark needs to send the data on the Executor side to the Driver side first, and then the Driver side broadcasts the data to the Executor side. If we need to Broadcast more data, it will cause OOM on the Driver side. The details are shown as follows:

Broadcast Hash Join mainly includes two stages:

  • Broadcast stage: the small table is cached in the executor
  • Hash Join phase: execute Hash Join in each executor

Conditions and characteristics

  • Only equivalent connection is supported, and the join key does not need sorting
  • All join types except full outer joins are supported
  • Broadcast Hash Join is more efficient than other JOIN mechanisms. However, Broadcast Hash Join is a network intensive operation (redundant data transmission). In addition, it needs to cache data on the Driver side. Therefore, when the amount of data in a small table is large, OOM will occur
  • The data volume of the broadcast small table is less than spark sql. Autobroadcastjointhreshold value, the default is 10MB(10485760)
  • The size threshold of the broadcast table cannot exceed 8GB, spark2 4. The source code is as follows: broadcastexchangeexec scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
  • The base table cannot be broadcasted. For example, when the left is connected, only the right table can be broadcasted. Such as: fact_table.join(broadcast(dimension_table), you can not use the broadcast prompt. When the conditions are met, it will automatically switch to the JOIN mode.

Sort Merge Join

brief introduction

The JOIN mechanism is Spark's default and can be set by the parameter Spark sql. JOIN. Prefersortmerge JOIN is configured. The default value is true, that is, Sort Merge Join is preferred. This method is generally used when joining two large tables. Sort Merge Join can reduce the data transmission in the cluster. This method does not load all data into memory first and then hashjoin, but you need to sort the JOIN keys before joining. Specific diagram:

Sort Merge Join mainly includes three stages:

  • Shuffle Phase: two large tables are repartitioned according to the Join key
  • Sort Phase: sort the data in each partition
  • Merge Phase: JOIN the sorted partition data from different tables, and merge the data sets by traversing the elements and connecting the rows with the same Join key value

Conditions and characteristics

  • Only equivalent connections are supported
  • All join types are supported
  • Join Keys are sorted
  • Parameter spark sql. join. Prefersortmergejoin (default true) is set to true

Cartesian Join

brief introduction

If the two tables participating in the Join in Spark do not specify the join key (ON condition), a Cartesian product join will be generated. The result of this Join is actually the product of the number of two rows.

condition

  • Only internal connections are supported
  • Support equivalent and unequal connection
  • Open the parameter spark sql. crossJoin. enabled=true

Broadcast Nested Loop Join

brief introduction

This method is that when there is no suitable join mechanism to choose from, this kind of join strategy will be selected finally. Priority: Broadcast hash join > sort merge join > shuffle hash join > Cartesian join > broadcast nested loop join

If there is an internal connection or non equivalent connection between Cartesian and Broadcast Nested Loop Join, the Broadcast Nested Loop strategy will be preferred. When the non equivalent connection and a table can be broadcast, Cartesian Join will be selected.

Conditions and characteristics

  • Support equivalent and non equivalent connection
  • All JOIN types are supported. The main optimization points are as follows:

    • Broadcast the left table when the right is connected
    • Broadcast the right table when the left outside is connected
    • When internal connection, broadcast the left and right tables

How does Spark choose the JOIN strategy

Equivalent connection

If there are join hints, follow the order below

  • 1.Broadcast Hint: if the join type is supported, select broadcast hash join
  • 2.Sort merge hint: if the join key is sorted, select sort merge join
  • 3.shuffle hash hint: if the join type supports, select shuffle hash join
  • 4.shuffle replicate NL hint: if it is an internal connection, select Cartesian product method

If there are no join hints, check the following rules one by one

  • 1. If the join type is supported and one of the tables can be broadcast (spark.sql.autoBroadcastJoinThreshold value, 10MB by default), select broadcast hash join
  • 2. If the parameter spark sql. join. If the prefersortmergejoin is set to false and a table is small enough (a hash map can be built), select shuffle hash join
  • 3. If the join keys are sorted, select sort merge join
  • 4. If it is an internal connection, select cartesian join
  • 5. If OOM may occur or there is no alternative execution strategy, finally select broadcast nested loop join

Non equivalent connection

There are join hints, in the following order

  • 1.broadcast hint: select broadcast nested loop join
  • 2.shuffle replicate NL hint: if it is an internal connection, select cartesian product join

If there are no join hints, check the following rules one by one

  • 1. If a table is small enough (can be broadcast), select broadcast nested loop join
  • 2. If it is an internal connection, select cartesian product join
  • 3. If OOM may occur or there is no alternative execution strategy, finally select broadcast nested loop join

Source code fragment of join strategy selection

  object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
          getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.BroadcastHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
          getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.ShuffledHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createSortMergeJoin() = {
          if (RowOrdering.isOrderable(leftKeys)) {
            Some(Seq(joins.SortMergeJoinExec(
              leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
          } else {
            None
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

        createBroadcastHashJoin(true)
          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
          .orElse(createShuffleHashJoin(true))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())

    
          if (canBuildLeft(joinType)) BuildLeft else BuildRight
        }

        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
          val maybeBuildSide = if (buildLeft && buildRight) {
            Some(desiredBuildSide)
          } else if (buildLeft) {
            Some(BuildLeft)
          } else if (buildRight) {
            Some(BuildRight)
          } else {
            None
          }

          maybeBuildSide.map { buildSide =>
            Seq(joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), buildSide, joinType, condition))
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }

        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())
      case _ => Nil
    }
  }

summary

This paper mainly introduces five JOIN strategies provided by Spark, and graphically analyzes three important JOIN strategies. Firstly, this paper combs the factors affecting the JOIN, then introduces five Spark JOIN strategies, expounds the specific meaning and trigger conditions of each JOIN strategy, and finally gives the source code fragment corresponding to the selection of the JOIN strategy. I hope this article can help you.

Official account "big data technology and data warehouse", reply to "data" to receive the big data package

Tags: Spark

Posted by Helminthophobe on Sat, 07 May 2022 15:14:37 +0300