preface
Summarizing the use of Hudi Spark SQL, I still use Hudi 0 Taking version 9.0 as an example, some changes in the latest version will also be mentioned slightly. Hudi has supported Spark SQL since version 0.9.0. It was contributed by pengzhiwei, a classmate of Alibaba. Pengzhiwei is no longer responsible for Hudi, but is instead in the charge of his colleague YannByron. Now ForwardXu has contributed many functions and features. At present, it seems that ForwardXu is mainly responsible for it.
All three are big guys and Apache Hudi Committer. Worship big guys and learn from them!!! Big guy's github:
- Peng Zhiwei (ALI) https://github.com/pengzhiwei2018
- Bi Yan (ALI) YannByron https://github.com/YannByron
- Xu Qianjin (Tencent) ForwardXu https://github.com/XuQianJin-Stars
Of course, there are many other bigwigs, such as Apache member/Hudi PMC Raymond Xu / Xu Shiyan https://github.com/xushiyan , responsible for the whole Spark module
configuration parameter
Core parameters:
- –conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
- –conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
use
Three ways to use Hudi Spark SQL
Spark Thrift Server
Start Hudi spark thrift server
spark-submit --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --name hudi-spark-thrift-server --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab --hiveconf hive.server2.thrift.http.port=20003
Connect Hudi spark thrift server
/usr/hdp/3.1.0.0-78/spark2/bin/beeline -u "jdbc:hive2://192.168.44.128:20003/default;principal=HTTP/indata-192-168-44-128.indata.com@INDATA.COM?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice"
Spark SQL script
spark-sql --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 2G --driver-cores 2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal spark/indata-192-168-44-128.indata.com@INDATA.COM --keytab /etc/security/keytabs/spark.service.keytab
Spark Program
After configuring the parameters, you can directly use spark SQL (SQL)
Build table
create table test_hudi_table ( id int, name string, price double, ts long, dt string ) using hudi partitioned by (dt) options ( primaryKey = 'id', preCombineField = 'ts', type = 'cow' ) location '/tmp/test_hudi_table'
- using hudi means that the table we want to create is a Hudi table
- primaryKey primary key. If it is not set, it means that the table has no primary key. It must be set after version 0.9.0
- preCombineField precombinefield
- Type table type
Other hudi parameters are also supported: some configuration parameters at the beginning of hoodie have higher priority. Table parameters can override other SQL default parameters. Use with caution because some parameters may have bug s, such as hoodie table. Name and hoodie datasource. write. Operation, refer to PR for details: https://github.com/apache/hudi/pull/5495
- If location specifies an external path, the table defaults to the external table. If not specified, the database path is used and the table is the internal table
Tblproperties is recommended for options after version 0.9.0. Options can continue to be used
After executing the table creation statement, the Hudi table will be initialized under the corresponding table path and generated Houdie metadata directory and will synchronize the metadata information of Hudi table to Hive table. You can verify the logic of internal table and external table in Hive by yourself. Spark SQL cannot verify at present. Even if it is an external table, it will not be displayed. I don't know whether it is a bug
insert
insert into test_hudi_table values (1,'hudi',10,100,'2021-05-05'),(2,'hudi',10,100,'2021-05-05')
or
insert into test_hudi_table select 1 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt union select 2 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt
After inserting the query, verify whether the data is successfully inserted
select * from test_hudi_table +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+ | _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt | +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+ | 20220513110302 | 20220513110302_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 2 | hudi | 10.0 | 100 | 2021-05-05 | | 20220513110302 | 20220513110302_0_2 | id:1 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 1 | hudi | 10.0 | 100 | 2021-05-05 | +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
In addition, note: insert will be updated randomly by default. Random means that in some cases, it is related to Hudi merging small files. The principle is not explained in detail here. You can check the source code by yourself (a related article may be summarized separately in the future, which is related to Hudi writing files and merging files).
To prevent the insert operation from updating, you can use the following configuration:
hoodie.merge.allow.duplicate.on.inserts = true
Relevant PR: https://github.com/apache/hudi/pull/3644 , the PR supports this parameter in the Java client, and the Spark client itself (before that) supports this parameter
update
update test_hudi_table set price = 20.0 where id = 1
The price field has been successfully updated
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+ | _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt | +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+ | 20220513110302 | 20220513110302_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 2 | hudi | 10.0 | 100 | 2021-05-05 | | 20220513143459 | 20220513143459_0_1 | id:1 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-57-3422_20220513143459.parquet | 1 | hudi | 20.0 | 100 | 2021-05-05 | +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
delete
delete from test_hudi_table where id = 1
The record with id 1 was successfully deleted
+----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+ | _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt | +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+ | 20220513110302 | 20220513110302_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-27-2009_20220513110302.parquet | 2 | hudi | 10.0 | 100 | 2021-05-05 | +----------------------+-----------------------+---------------------+-------------------------+--------------------------------------------------------------------------+-----+-------+--------+------+-------------+--+
merge
HUDI supports MERGE statements, including merge into, merge update and MERGE delete. The addition, deletion and modification can be unified as follows:
merge into test_hudi_table as t0 using ( select 1 as id, 'hudi' as name, 112 as price, 98 as ts, '2021-05-05' as dt,'INSERT' as opt_type union select 2 as id, 'hudi_2' as name, 10 as price, 100 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union select 3 as id, 'hudi' as name, 10 as price, 100 as ts, '2021-05-05' as dt ,'DELETE' as opt_type ) as s0 on t0.id = s0.id when matched and opt_type!='DELETE' then update set * when matched and opt_type='DELETE' then delete when not matched and opt_type!='DELETE' then insert *
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+ | _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt | +----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+ | 20220513143914 | 20220513143914_0_1 | id:2 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-137-7255_20220513143914.parquet | 2 | hudi_2 | 10.0 | 100 | 2021-05-05 | | 20220513143914 | 20220513143914_0_2 | id:1 | dt=2021-05-05 | 083f7be7-ecbd-4c60-a5c2-51b04bb5a3d5-0_0-137-7255_20220513143914.parquet | 1 | hudi | 112.0 | 98 | 2021-05-05 | +----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+---------+--------+------+-------------+--+
The benefits are:
1. Unify all types into one SQL to avoid generating too many job s
2. Avoid generating exceptions. For inserts, exceptions will be generated if the primary key of the new insert already exists
3. Reduce the complexity of the program. For update and delete, there is no need to judge where conditions and which fields to modify, and then splice the sql (these contents may not be available from binlog)
4. Improve performance. For batch DELETE, the performance of merge is better than that of in.
However, it should be noted that when preCombineField = 'ts' is not set, the new data will directly overwrite the historical data. This situation exists when the arrival time of the new data is earlier than that of the old data.
merge values
stay merge Optimize the source code on the basis of (the code has not been submitted to the community at present. If you want to use it, you can check: https://gitee.com/dongkelun/hudi/commits/0.9.0 ), make Hudi SQL support the form of merge values. The example is as follows:
merge into test_hudi_table as t0 using (1, 'hudi', 112, 98, '2021-05-05','INSERT'), (2, 'hudi_2', 1, 100, '2021-05-05','UPDATE'), (3, 'hudi', 10, 100, '2021-05-05','DELETE') as s0 (id,name,price,ts,dt,opt_type) on t0.id = s0.id when matched and opt_type!='DELETE' then update set * when matched and opt_type='DELETE' then delete when not matched and opt_type!='DELETE' then insert *
For SQL data synchronization: the reason for this modification is merge It is in the form of merge subQuery. When the splicing SQL is very long, such as 7000 records, which is equal to 7000 select statements, the program is very slow to parse SQL in the form of recursion, and the time to parse subQuery alone takes 10 minutes, which can not meet the needs of our minute level transactions. By modifying the source code to support the form of merge values and transmitting values through values, the parsing time is reduced from 10 minutes to a few seconds. Later, the program is used to convert values into a table and directly upsert, which greatly improves the transaction time of each batch. After testing, tens of millions of historical data and tens of millions of daily increments, that is, an average of 7000 per minute, can meet the needs of minute level transactions.
Evaluation
For recording merge values Performance results of statement test
Spark Thrift Server configuration parameters
--executor-memory 4G --num-executors 15 --executor-cores 2 --driver-memory 6G --driver-cores 2
historical data
The evaluation data is based on TPC-DS web_ The sales table has 10 million historical data and 10 million simulated daily increment. It should be noted that the decimal type of the source data table is also double and cannot be decimal, otherwise there will be exceptions during subsequent incremental data synchronization (Hudi Spark SQL has a bug for decimal type)
Incremental data
SQL Server splicing is completed within 10000 incremental records, and SQL server synchronization is completed within 10000 incremental records
Evaluation results
Spark Server | Streaming | ||||
---|---|---|---|---|---|
batch | Batch data volume | Time (s) | batch | Batch data volume | Time (s) |
1 | 6000 | 119 | 1 | 6898 | 59 |
2 | 6000 | 79 | 2 | 3211 | 27 |
3 | 6000 | 70 | 3 | 5999 | 36 |
4 | 6000 | 68 | 4 | 5999 | 33 |
5 | 5902 | 32 | |||
6 | 5995 | 35 |
Spark Server is a Java program that connects to Spark Thrift Server through JDBC. The first time there is no cache, the time is 120 seconds, and the time is 70 seconds when there is cache.
Streaming is to use Structured Streaming to splice Merge SQL in each batch, and then call spark SQL () implementation. From the result, streaming is 30 seconds faster than Spark Server. The main reason is that the delayed scheduling time of Spark Server is longer than that of streaming. At present, no solution has been found to reduce the time of Spark Server to the same time as that of streaming.
The incremental data simulated in this evaluation includes all partitions every minute, which does not have the effect of partition filtering. The actual production data contains only a small number of partitions, which can play the effect of partition filtering. The performance of incremental synchronization is better than this evaluation.
summary
This paper mainly summarizes hudi0 The use of SQL commands commonly used in Spark SQL version 9.0 and some precautions also support other SQL semantics, and the new version supports more. You can learn and test by yourself.