[Delta][SQL] Delta open source payment function, the most complete analysis of the source code implementation process of ZOrder

Usually, in order to improve the efficiency of data processing, the computing engine needs to implement predicate pushdown, and the storage engine can skip irrelevant data or files as much as possible according to the pushdown filter conditions. Whether it is Hudi, Iceberg or Delta, Data-skiping technology based on min-max index has been implemented. It means that the minimum and maximum values ​​of each column in the data file are recorded in the metadata, and the predicate on the column in the query is used to determine whether the current data file may contain any records that satisfy the predicate, and whether it can be skipped Read the current data file.

But when the data is evenly distributed in all files, the range of upper_bounds and lower_bounds of each file column will be very large, then the ability to skip data will be invalid. Secondly, in order to avoid clustering or correlation between partition field columns and other query filter columns, it is generally recommended to sort before querying.

But the traditional linear sorting sort, its skip efficiency is only high in the first column, but its effect decreases rapidly in the subsequent columns. Therefore, if two or more columns are equally likely to appear in a highly selective predicate, data skipping will not result in better performance for the whole.

11Untitled.jpeg

From the examples in the pictures above, it can be seen that for 3-tuple integers in dictionary order, only the first column can aggregate data into continuous filterable data by sorting, but if the value if the data is "4", you will find that it is scattered everywhere, and there is no localization at all, so the skip performance of the data can be imagined.

Z-order, also known as Z-order curve, has a very important application feature, which is dimensionality reduction. It can reduce multi-dimensional space problems to low-dimensional or one-dimensional space problems. Convert multiple columns into a Z-index column, and sort according to it. According to the characteristics that data with similar Z-Order values ​​will be distributed to the same file, from the value distribution of each dimension, from the overall point of view of the data will also be Presents an approximately monotonic distribution. However, the coincidence of the upper_bounds and lower_bounds of the file will be effectively reduced, and the dataskipping technology will be effective again. The core of Z-order is to improve File Skip instead of Row Skip, which can reduce unnecessary IO. In addition to the z-order, the Hilbert curve is also common.

Z-order brief description

Map multidimensional data to one dimension and sort by this dimension

The key to Z-Order is the mapping rule of z-value. Based on the bit-interleaving technique, the bit-interleaving of each dimension value appears in the final z-value. For example, suppose we want to calculate the z-value of two-dimensional coordinates (x=97, y=214), we can do it as follows

Step 1: Represent each dimension of data in bits

x value: 01100001  97     98
y value: 11010110  104    105
copy

Step 2: Starting from the leftmost bit of y, we cross x and y bitwise to get the z value, as shown below

z-value:  1011011000101001 46633
copy

For multidimensional data, we can use the same method to cross bits of each dimension bitwise to form z-values. Once we generate z-values, we can use that value to sort. Sorting based on z-values naturally forms a z-order curve It has a good aggregation effect on multiple dimensions involved in generating z-values.

What is the performance effect of zorder? Let's take an example:

[Image upload failed...(image-eda57c-1657366659242)]

In the above picture, each data frame represents a file, and each file stores 4 data evenly. The left is the data distribution after linear sorting, and the right is Zorder sorting. It can be seen that when querying the conditions of x = 2 or y = 2, linear sorting needs to scan 9 files, and zorder sorting only needs to scan 7 files.

Several details of Delta's Z-order

It can be said that it is not difficult to implement Z-order, but it is more complicated to implement efficient Z-order. To implement Z-order, we must first consider how to convert multi-column query predicate values ​​into z-value s.

It can be seen from the above introduction that the most intuitive way to generate z-values ​​is to convert multidimensional data into binary and then perform bitwise crossover to generate z-values. If you convert different types of data directly to binary, there are several problems:

  1. How to ensure that different types of dimension values ​​(String, Long, Double ...) have the same length when converted into bit s? Here, it may be necessary to fill the left with 0 if the number of digits is not enough, and it may be necessary to truncate the longer string such as String.
  2. How to handle null values ​​of different data types? The cross generation of z-value does not allow null values, and the min-max value can be selected as the filling of null.

It can be seen from the above that if you directly convert multi-column values ​​to binary, you not only need to allocate a new byte buffer for each column value, but also need to perform different additional operations on different data types. At the same time, due to the existence of String interception, it may cause The data is inaccurate, and the String type is a more commonly used type.

In order to solve the above problems, it is generally used to sort the query columns and map each row of data to a sequential id, similar to the window function of row_number() or dense_rank() or rank().

However, in this case, the query columns are sorted in sequence, and it can be seen that the performance must be greatly affected.

So how is Delta achieved? How to solve the above problem?

Delta takes a reduced precision approach, treating consecutive values ​​as one unit and converting any query column to range_parition_id(). The number of partitions here can be represented by OPTIMIZE_ZORDERBY_NUM_RANGE_IDS.

So how to achieve it? The implementation is actually very simple. To obtain range_parition_id, you only need to reuse the RangePartition operation in Spark, and implement the expression range_partition_id(col, N) -> int based on it. Through this expression, the process of converting the query class to binary is realized, which avoids additional operations and multiple sorting. Such an implementation utilizes RangePartition to sample keys to compute partition boundaries.

Z-value s ​​are generated after converting multiple query columns to binary and then calling the interleace_bits(...) method to intersect.

So how to sort and write out the implementation? In that way?

There are two problems in how to directly sort data globally according to Z-value:

  1. Sorting the entire data is very inefficient.
  2. There are "seams" in the Z-order curve, where linear traversal needs to jump to different regions before continuing its path, which is bad for small-scale queries.

Then the Delta implementation mainly divides it according to the z-value range, which is actually the expression that calls Spark's repartitionByRange.

How to deal with data skew?

If the column to be clustered is skewed as a whole, even if it is converted to z-value, it will be skewed. At this time, it may be time-consuming to sort and write it out. The solution here is actually very simple to append random bytes to the end of the z-value byte array, and then sort it within the partition range.

Delta's Z-order source code analysis

Let's analyze the source code from the perspective of user invocation:

OPTIMIZE delta.table WHERE day = 25 AND city = 'New York' ZORDER BY (col1, col2)
copy
  1. SQL is parsed into an Optimizer command for pre-execution verification

When the user executes the above sql, it will first go through the sql parsing phase. Spark uses the open source component antlr4 to parse the input SQL into an AST tree. Its parsing syntax is in the DeltaSQLBase.g4 file.

| OPTIMIZE (path=STRING | table=qualifiedName)
    (WHERE partitionPredicate = predicateToken)?
    (zorderSpec)?                                                   #optimizeTable

zorderSpec
    : ZORDER BY LEFT_PAREN interleave+=qualifiedName (COMMA interleave+=qualifiedName)* RIGHT_PAREN
    | ZORDER BY interleave+=qualifiedName (COMMA interleave+=qualifiedName)*
    ;
copy

As can be seen from the above source code, OPTIMIZE sql supports not only table names but also directly specified optimized file directories. However, it should be noted here that when optimizing the data layout, the filter column of the where condition must be a subset of the partition column. That is, the query columns day and city must be partition columns.

The parsing of g4 files is generally in the DeltaSqlParser class, and the zorder column input by the user can be obtained through the visitZorderSpec method. The directory name and table name to be optimized and the filtering conditions are obtained through the visitOptimizeTable method.

override def visitOptimizeTable(ctx: OptimizeTableContext): AnyRef = withOrigin(ctx) {
  if (ctx.path == null && ctx.table == null) {
    throw new ParseException("OPTIMIZE command requires a file path or table name.", ctx)
  }
  // seq of columns in z-order
  val interleaveBy = Option(ctx.zorderSpec).map(visitZorderSpec).getOrElse(Seq.empty)
  OptimizeTableCommand(
    Option(ctx.path).map(string),
    Option(ctx.table).map(visitTableIdentifier),
    Option(ctx.partitionPredicate).map(extractRawText(_)))(interleaveBy)
}
copy

Here, the data in the OptimizeTableContext command is obtained through the design mode of the visitor. It can be seen from the above that the array of z-order columns is first obtained from visitZorderSpec, and then encapsulated into the OptimizeTableCommand class. OptimizeTableCommand is a command expression that executes its run method when executed.

override def run(sparkSession: SparkSession): Seq[Row] = {
  val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE")

  // [1] Get the partition column of the table from metadata
  val partitionColumns = deltaLog.snapshot.metadata.partitionColumns
  // [2] Parse query predicates
  val partitionPredicates = partitionPredicate.map(predicate => {
    val predicates = parsePredicates(sparkSession, predicate)
    verifyPartitionPredicates(
      sparkSession,
      partitionColumns,
      predicates)
    predicates
  }).getOrElse(Seq.empty)
  // [3] Check the zorder column
  validateZorderByColumns(sparkSession, deltaLog, zOrderBy)
  val zOrderByColumns = zOrderBy.map(_.name).toSeq
  // [4] Call the optimize command
  new OptimizeExecutor(sparkSession, deltaLog, partitionPredicates, zOrderByColumns)
      .optimize()
}
copy

It can be seen from this that verification will be performed before Optimize is executed. First, the zorder should be zordered for the data in the corresponding partition directory, so the zorder column should not contain the partition column. Secondly, the zorder column must be a column that has completed min-max statistics in the metadata, that is, data can be skipped through it. Finally, the optimize method of OptimizeExecutor is called. Let's take a look at the optimize method:

  1. Filter candidate files and perform partition compression on the files
def optimize(): Seq[Row] = {
  val txn = deltaLog.startTransaction()
  // [1] Filter out candidate files according to where conditions
  val candidateFiles = txn.filterFiles(partitionPredicate)
  val partitionSchema = txn.metadata.partitionSchema
  // [2] Note that if isMultiDimClustering is multi-dimensional aggregation, all files are directly selected without filtering the size of the file
  // select all files in case of multi-dimensional clustering
  val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
  val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

  val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
copy

Here, candidate files are mainly screened, and files are grouped before optimization. It should be noted here that if it is a multidimensional aggregation, all files are directly selected without filtering the size of the files. The file grouping algorithm here adopts the binpack algorithm for compression, which ensures that the file size of each group is uniform.

val parallelJobCollection = new ParVector(jobs.toVector)

// Create a task pool to parallelize the submission of optimization jobs to Spark.
val threadPool = ThreadUtils.newForkJoinPool(
  "OptimizeJob",
  sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS))

val updates = try {
  val forkJoinPoolTaskSupport = new ForkJoinTaskSupport(threadPool)
  parallelJobCollection.tasksupport = forkJoinPoolTaskSupport
  // Parallel execution of file merging, compression and Zorder optimization
  parallelJobCollection.flatMap(partitionBinGroup =>
    runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)).seq
} finally {
  threadPool.shutdownNow()
}
copy

Then concurrently perform file merging, compression, and Zorder optimization. Of course, merging and compression have nothing to do with zorder. This is the reuse of the original function of the Optimizer, which can optimize the performance of zorder sorting. Next, we enter the runOptimizeBinJob method and mainly look at the implementation of Zorder optimization.

  1. Generate Z-value s ​​from multidimensional column values
// [1] Read grouped files
val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize"))
val repartitionDF = if (isMultiDimClustering) {
  val totalSize = bin.map(_.size).sum
  // The number of partitions is the size of the grouped file divided by the size of the largest file in it
  val approxNumFiles = Math.max(1, totalSize / maxFileSize).toInt
  // [2] Call MultiDimClustering.cluster
  MultiDimClustering.cluster(
    input,
    approxNumFiles,
    zOrderByColumns)
} else {
  input.coalesce(numPartitions = 1)
}
copy

The number of merged files is the size of the grouped files divided by the size of the largest file. Here is to read the grouped file and then call the cluster method.

override def cluster(
    df: DataFrame,
    colNames: Seq[String],
    approxNumPartitions: Int): DataFrame = {
  val conf = df.sparkSession.sessionState.conf
  // Used to control the rangeId, the larger the better the accuracy, but at the expense of performance
  val numRanges = conf.getConf(DeltaSQLConf.MDC_NUM_RANGE_IDS)
  // Whether to add noise, in order to avoid data skew, add noise suffix
  val addNoise = conf.getConf(DeltaSQLConf.MDC_ADD_NOISE)

  val cols = colNames.map(df(_))
  // Execute the cluster expression to generate the z-value
  val mdcCol = getClusteringExpression(cols, numRanges)
  val repartitionKeyColName = s"${UUID.randomUUID().toString}-rpKey1"
  ...
}
copy

In the cluster, get the numRanges and addNoise configurations, and then call getClusteringExpression to get the z-value column.

object ZOrderClustering extends SpaceFillingCurveClustering {
  override protected[skipping] def getClusteringExpression(
      cols: Seq[Column], numRanges: Int): Column = {
    assert(cols.size >= 1, "Cannot do Z-Order clustering by zero columns!")
    // [1] Call the range_partition_id expression to generate rangeIdCols
    val rangeIdCols = cols.map(range_partition_id(_, numRanges))
    // [2] Execute interleave_bits and convert to String
    interleave_bits(rangeIdCols: _*).cast(StringType)
  }
}
copy

[1] Call the range_partition_id expression to generate rangeIdCols

[2] Execute interleave_bits and convert it to String, which is the final generated z-value

The range_partition_id function is the implementation process of range_partition_id(col, N) -> int. The above partition actually reuses Spark's RangePartition. Let's expand and see how it is called here.

def range_partition_id(col: Column, numPartitions: Int): Column = withExpr {
  RangePartitionId(col.expr, numPartitions)
}
copy

The implementation of range_partition_id is very simple, just simply encapsulate it as the RangePartitionId class and return it. The RangePartitionId class is an empty expression operation. So what if it calls RangePartition?

In fact, this involves the execution optimization process of SparkSQL. Before SQL is executed, it usually needs to perform RBO optimization, CBO and other optimization processes. The implementation of these optimizations is usually registered and encapsulated in the form of Rule s. After optimization, they are converted into RDD s and then executed Spark tasks. .

extensions.injectOptimizerRule { session =>
  new RangePartitionIdRewrite(session)
}
copy

The above code injects a RangePartitionIdRewrite rule into the optimizer to rewrite the placeholder of range_partition_id to call RangePartitioner.

case RangePartitionId(expr, n) =>
  val aliasedExpr = Alias(expr, "__RPI_child_col__")()
  val exprAttr = aliasedExpr.toAttribute
  // [1] Filter null rows for query columns
  val planForSampling = Filter(IsNotNull(exprAttr), Project(Seq(aliasedExpr), node.child))
  ...
  withCallSite(session.sparkContext, desc) {
    SQLExecution.withNewExecutionId(qeForSampling) {
      withJobGroup(session.sparkContext, jobGroupId, desc) {
        // [2] Create a pair(InternalRow, null) to store the rangeid corresponding to the query column
        val rddForSampling = qeForSampling.toRdd.mapPartitionsInternal { iter =>
             val mutablePair = new MutablePair[InternalRow, Null]()
             iter.map(row => mutablePair.update(row.copy(), null))
        }
        // [3] Create a RangePartitioner and pass in the sorted sortOrder
        val sortOrder = SortOrder(exprAttr, Ascending)
        implicit val ordering = new LazilyGeneratedOrdering(Seq(sortOrder), Seq(exprAttr))
        val partitioner = new RangePartitioner(n, rddForSampling, true, sampleSizeHint)
        // [4] Call PartitionerExpr and execute write rangeid
        PartitionerExpr(expr, partitioner)
copy
override def eval(input: InternalRow): Any = {
  val value: Any = child.eval(input)
  row.update(0, value)
  partitioner.getPartition(row)
}
copy

From the above code, we can see that there are several things done here:

[1] Filter null rows for query columns

[2] Create a pair(InternalRow, null) to store the rangeid corresponding to the query column

[3] Create a RangePartitioner and pass in the sorted sortOrder

[4] Call PartitionerExpr and execute write rangeid

  1. range repartitioning based on z-value

Let's go back to the cluster method and look at the rest of the code:

var repartitionedDf = if (addNoise) {
  val randByteColName = s"${UUID.randomUUID().toString}-rpKey2"
  val randByteCol = (rand() * 255 - 128).cast(ByteType)
  df.withColumn(repartitionKeyColName, mdcCol).withColumn(randByteColName, randByteCol)
    .repartitionByRange(approxNumPartitions, col(repartitionKeyColName), col(randByteColName))
    .drop(randByteColName)
} else {
  df.withColumn(repartitionKeyColName, mdcCol)
    .repartitionByRange(approxNumPartitions, col(repartitionKeyColName))
}

repartitionedDf.drop(repartitionKeyColName)
copy

The code here is very intuitive. It actually calls the repartitionByRange expression, and finally passes in the z-value, and finally deletes the spliced ​​sorting partition column. Finally, call txn.writeFiles(repartitionDF) to execute.

  1. Update statistics
if (isMultiDimClustering) {
  val inputFileStats =
    ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum)
  optimizeStats.zOrderStats = Some(ZOrderStats(
    strategyName = "all", // means process all files in a partition
    inputCubeFiles = ZOrderFileStats(0, 0),
    inputOtherFiles = inputFileStats,
    inputNumCubes = 0,
    mergedFiles = inputFileStats,
    // There will one z-cube for each partition
    numOutputCubes = optimizeStats.numPartitionsOptimized))
}
copy

Finally update the stats of the file after merging, compressing and zorder sorting.

Let's summarize the whole process and compare the implementation differences with Iceberg and Hudi:

  1. The files to be optimized need to be filtered out. The where condition of the OPTIMIZE statement only supports the use of partition columns, that is, it supports OPTIMIZE for some partitions of the table.
  2. The Z address is calculated from the multidimensional column values. Here, different types of query columns are converted into extensive rangeIds, and then the rangeIds of query columns are converted into binary for cross-combination to generate z-value s. However, the rangeId here needs to be configured through expert experience, and secondly, when the data is skewed, noise bytes are randomly added to the z-value array.
  3. Perform range repartitioning based on z-value. Data will be shuffle d into multiple partition s. This step is equivalent to repartitionByRange(z-value).
  4. Write the repartitioned partition back to the storage system using Copy on Write, and then update the statistics.

It can be seen from this that the z-order implemented by Delta and the implementations of Hudi and Iceberg are essentially based on zorder for sorting between files, and the idea of ​​linear sorting is still used for sorting within files. This avoids the better performance of using linear sorting in small range queries (queries that fall exactly within a single file). But the difference is in the way of generating z-values. The way Delta generates z-values ​​is to use the method of mapping to rangeid instead of directly converting to binary. This way avoids extra operations and multiple sorting, but requires more expert experience. In addition, the Zorder operation of Delta requires the user to perform optimization manually.

Below we leave a few questions to think about:

  1. Columns sorted by Z-order generally select those columns for sorting optimization. Is the more columns the better?
  2. After Z-order sorting, does it speed up all query sql, and will those scenarios become slower?

Posted by LiamBailey on Wed, 07 Sep 2022 21:05:34 +0300