Hudi Spark SQL summary

preface

Summarizing the use of Hudi Spark SQL, I still use Hudi 0 Taking version 9.0 as an example, some changes in the latest version will also be mentioned slightly. Hudi has supported Spark SQL since version 0.9.0. It was contributed by pengzhiwei, a classmate of Alibaba. Pengzhiwei is no longer responsible for Hudi, but is instead in the charge of his colleague YannByron. Now ForwardXu has contributed many functions and features. At present, it seems that ForwardXu is mainly responsible for it.
All three are big guys and Apache Hudi Committer. Worship big guys and learn from them!!! Big guy's github:

Of course, there are many other bigwigs, such as Apache member/Hudi PMC Raymond Xu / Xu Shiyan https://github.com/xushiyan , responsible for the whole Spark module

configuration parameter

Core parameters:

  • –conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
  • –conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

use

Three ways to use Hudi Spark SQL

Spark Thrift Server

Start Hudi spark thrift server

spark-submit --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --name hudi-spark-thrift-server  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab --hiveconf hive.server2.thrift.http.port=20003

Connect Hudi spark thrift server

/usr/hdp/3.1.0.0-78/spark2/bin/beeline -u "jdbc:hive2://192.168.44.128:20003/default;principal=HTTP/indata-192-168-44-128.indata.com@INDATA.COM?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice"

Spark SQL script

spark-sql --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab

Spark Program

After configuring the parameters, you can directly use spark SQL (SQL)

Build table

create table test_hudi_table (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
 partitioned by (dt)
 options (
  primaryKey = 'id',
  preCombineField = 'ts',
  type = 'cow'
 )
location '/tmp/test_hudi_table'
  • using hudi means that the table we want to create is a Hudi table
  • primaryKey primary key. If it is not set, it means that the table has no primary key. It must be set after version 0.9.0
  • preCombineField precombinefield
  • Type table type

Other hudi parameters are also supported: some configuration parameters at the beginning of hoodie have higher priority. Table parameters can override other SQL default parameters. Use with caution because some parameters may have bug s, such as hoodie table. Name and hoodie datasource. write. Operation, refer to PR for details: https://github.com/apache/hudi/pull/5495

  • If location specifies an external path, the table defaults to the external table. If not specified, the database path is used and the table is the internal table

Tblproperties is recommended for options after version 0.9.0. Options can continue to be used

After executing the table creation statement, the Hudi table will be initialized under the corresponding table path and generated Houdie metadata directory and will synchronize the metadata information of Hudi table to Hive table. You can verify the logic of internal table and external table in Hive by yourself. Spark SQL cannot verify at present. Even if it is an external table, it will not be displayed. I don't know whether it is a bug

insert

insert into test_hudi_table values (1,'hudi',10,100,'2021-05-05'),(2,'hudi',10,100,'2021-05-05')

or

insert into test_hudi_table
select 1 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt union
select 2 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt 

After inserting the query, verify whether the data is successfully inserted

select * from test_hudi_table

+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                            _hoodie_file_name                             | id  | name  | price  |  ts  |     dt      |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| 20220513110302       | 20220513110302_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 2   | hudi  | 10.0   | 100  | 2021-05-05  |
| 20220513110302       | 20220513110302_0_2    | id:1                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 1   | hudi  | 10.0   | 100  | 2021-05-05  |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+

In addition, note: insert will be updated randomly by default. Random means that in some cases, it is related to Hudi merging small files. The principle is not explained in detail here. You can check the source code by yourself (a related article may be summarized separately in the future, which is related to Hudi writing files and merging files).
To prevent the insert operation from updating, you can use the following configuration:

hoodie.merge.allow.duplicate.on.inserts = true

Relevant PR: https://github.com/apache/hudi/pull/3644 , the PR supports this parameter in the Java client, and the Spark client itself (before that) supports this parameter

update

update test_hudi_table set price = 20.0 where id = 1

The price field has been successfully updated

+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                            _hoodie_file_name                             | id  | name  | price  |  ts  |     dt      |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| 20220513110302       | 20220513110302_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 2   | hudi  | 10.0   | 100  | 2021-05-05  |
| 20220513143459       | 20220513143459_0_1    | id:1                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-57-3422_20220513143459.parquet  | 1   | hudi  | 20.0   | 100  | 2021-05-05  |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+

delete

delete from test_hudi_table where id = 1

The record with id 1 was successfully deleted

+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                            _hoodie_file_name                             | id  | name  | price  |  ts  |     dt      |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
| 20220513110302       | 20220513110302_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet  | 2   | hudi  | 10.0   | 100  | 2021-05-05  |
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+

merge

HUDI supports MERGE statements, including merge into, merge update and MERGE delete. The addition, deletion and modification can be unified as follows:

merge into test_hudi_table as t0
using (
  select 1 as id, 'hudi' as name, 112 as price, 98 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
  select 2 as id, 'hudi_2' as name, 10 as price, 100 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
  select 3 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
 ) as s0
on t0.id = s0.id
when matched and opt_type!='DELETE' then update set *
when matched and opt_type='DELETE' then delete
when not matched and opt_type!='DELETE' then insert *

+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+
| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                             _hoodie_file_name                             | id  |  name   | price  |  ts  |     dt      |
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+
| 20220513143914       | 20220513143914_0_1    | id:2                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-137-7255_20220513143914.parquet  | 2   | hudi_2  | 10.0   | 100  | 2021-05-05  |
| 20220513143914       | 20220513143914_0_2    | id:1                | dt=2021-05-05           | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-137-7255_20220513143914.parquet  | 1   | hudi    | 112.0  | 98   | 2021-05-05  |
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+

The benefits are:
1. Unify all types into one SQL to avoid generating too many job s
2. Avoid generating exceptions. For inserts, exceptions will be generated if the primary key of the new insert already exists
3. Reduce the complexity of the program. For update and delete, there is no need to judge where conditions and which fields to modify, and then splice the sql (these contents may not be available from binlog)
4. Improve performance. For batch DELETE, the performance of merge is better than that of in.
However, it should be noted that when preCombineField = 'ts' is not set, the new data will directly overwrite the historical data. This situation exists when the arrival time of the new data is earlier than that of the old data.

merge values

stay merge Optimize the source code on the basis of (the code has not been submitted to the community at present. If you want to use it, you can check: https://gitee.com/dongkelun/hudi/commits/0.9.0 ), make Hudi SQL support the form of merge values. The example is as follows:

merge into test_hudi_table as t0
using 
(1, 'hudi', 112, 98, '2021-05-05','INSERT'),
(2, 'hudi_2', 1, 100, '2021-05-05','UPDATE'),
(3, 'hudi', 10, 100, '2021-05-05','DELETE')
as s0 (id,name,price,ts,dt,opt_type)
on t0.id = s0.id
when matched and opt_type!='DELETE' then update set *
when matched and opt_type='DELETE' then delete
when not matched and opt_type!='DELETE' then insert *

For SQL data synchronization: the reason for this modification is merge It is in the form of merge subQuery. When the splicing SQL is very long, such as 7000 records, which is equal to 7000 select statements, the program is very slow to parse SQL in the form of recursion, and the time to parse subQuery alone takes 10 minutes, which can not meet the needs of our minute level transactions. By modifying the source code to support the form of merge values and transmitting values through values, the parsing time is reduced from 10 minutes to a few seconds. Later, the program is used to convert values into a table and directly upsert, which greatly improves the transaction time of each batch. After testing, tens of millions of historical data and tens of millions of daily increments, that is, an average of 7000 per minute, can meet the needs of minute level transactions.

Evaluation

For recording merge values Performance results of statement test

Spark Thrift Server configuration parameters

--executor-memory 4G --num-executors 15 --executor-cores 2 --driver-memory 6G --driver-cores 2 

historical data

The evaluation data is based on TPC-DS web_ The sales table has 10 million historical data and 10 million simulated daily increment. It should be noted that the decimal type of the source data table is also double and cannot be decimal, otherwise there will be exceptions during subsequent incremental data synchronization (Hudi Spark SQL has a bug for decimal type)

Incremental data

SQL Server splicing is completed within 10000 incremental records, and SQL server synchronization is completed within 10000 incremental records

Evaluation results

Spark ServerStreaming
batchBatch data volumeTime (s)batchBatch data volumeTime (s)
160001191689859
26000792321127
36000703599936
46000684599933
5590232
6599535

Spark Server is a Java program that connects to Spark Thrift Server through JDBC. The first time there is no cache, the time is 120 seconds, and the time is 70 seconds when there is cache.
Streaming is to use Structured Streaming to splice Merge SQL in each batch, and then call spark SQL () implementation. From the result, streaming is 30 seconds faster than Spark Server. The main reason is that the delayed scheduling time of Spark Server is longer than that of streaming. At present, no solution has been found to reduce the time of Spark Server to the same time as that of streaming.

The incremental data simulated in this evaluation includes all partitions every minute, which does not have the effect of partition filtering. The actual production data contains only a small number of partitions, which can play the effect of partition filtering. The performance of incremental synchronization is better than this evaluation.

summary

This paper mainly summarizes hudi0 The use of SQL commands commonly used in Spark SQL version 9.0 and some precautions also support other SQL semantics, and the new version supports more. You can learn and test by yourself.

Tags: Spark Hudi

Posted by TheBentinel.com on Thu, 19 May 2022 00:31:54 +0300