Two-phase commit in flink

Two-phase commit of flink transaction


Scenario description:

Two-phase commit (2PC) is the most basic distributed consensus protocol and is widely used. This article introduces its relevant details and its typical application scenarios in Flink. .


2PC In a distributed system, in order to allow each node to perceive the transaction execution of all other nodes, we need to introduce a central node that unifies the execution logic and progress of all nodes. This central node is called the coordinator, and other nodes that report to or are scheduled by the central node are called participants.

Specific process

request phase
1. The coordinator sends a prepare request and transaction content to all participants, asking if the transaction can be prepared for submission, and waits for the participant's response.
2. The participant performs the include operation in the transaction and records the undo log (for rollback) and redo log (for replay), but does not actually commit.
3. The participant returns the execution result of the transaction to the coordinator, and the execution of Chen Gong returns yes, otherwise it returns no.
Commit stage (divided into success and failure)
If all participants return yes, the transaction can be committed.
1. The coordinator sends a commit request to all participants.
2. After the participant receives the commit request, the transaction is actually committed, the occupied transaction resources are released, and ack is returned to the coordinator.
3. The coordinator receives the ack message from all participants, and the transaction is successfully completed.
If any participant returns no or the timeout does not return, it indicates the transaction terminal and needs to be rolled back.

1. The coordinator sends a rollback request to all participants.
2. After the participant receives the rollback request, it rolls back to the state before the transaction execution according to the undo log, releases the occupied transaction resources, and returns ack to the coordinator.
3. The coordinator receives the ack message from all participants, and the transaction rollback is completed.

Advantages and disadvantages of 2pc
The advantage of 2PC is that the principle is very simple, easy to understand and implement. There are three main disadvantages, listed as follows:
(1) The coordinator has a single point problem. If the coordinator hangs, the entire 2PC logic cannot run at all.
(2) The execution process is completely synchronous. Each participant is in a blocking state while waiting for the response of other participants, and there are performance problems under large concurrency.
(3), there is still a risk of inconsistency. If only some participants receive the commit request due to accidents such as network anomalies, it will result in a situation where some participants submit transactions while others do not.
However, now people have done a lot of work in the field of distributed consistency, and there are countless distributed coordination frameworks represented by ZooKeeper. With these blessings, 2PC's reliability is greatly improved, and it can really be used in high-demand production. in the environment. Let's take a look at how 2PC and Flink are related.

flink is based on 2PC application

The most common application scenario of 2PC is actually a relational database, such as the XA transaction system of the mysql InnoDB storage engine.
As a stream processing engine, Flink naturally provides guarantees for exactly once semantics. flink's internal intent checkpoint mechanism and lightweight distributed snapshot algorithm ABS guarantee exactly once. Second, if we want to achieve end-to-end exactly-once output logic, we need to impose one of the following two restrictions: idempotent write and transactional write.

In Spark Streaming, it is entirely up to the user to implement transactional writing, and the framework itself does not provide any implementation. But there is a 2PC-based SinkFunction in Flink, named TwoPhaseCommitSinkFunction, which helps us do some basic work.

flink officially recommends that all sink logic that needs to guarantee exactly once should inherit this abstract class. It specifically defines the following four abstract methods. We need to implement it in a subclass.

   protected abstract TXN beginTransaction() throws Exception;
    protected abstract void preCommit(TXN transaction) throws Exception;
    protected abstract void commit(TXN transaction);
    protected abstract void abort(TXN transaction);


beginTransaction(): Begins a transaction and returns a handle to transaction information.
preCommit : Logic for the pre-commit (ie submit request) phase
commit(): The logic of the formal commit phase
abort(): cancel the transaction

The following describes the specific process of 2PC with the integration of Flink and Kafka. Note that the Kafka version here must be 0.11 and above, because only the 0.11+ version supports idempotent producer s and transactions, so 2PC has meaning. The internal transactional mechanism of Kafka is shown in the following block diagram.
Transactions and idempotency reference for kafka.

The specific implementation of flink's two-phase commit is:
The FlinkKafkaProducer011.commit() method is actually a proxy for the KafkaProducer.commitTransaction() method to formally submit transactions to Kafka.

    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
            } finally {


The call point of this method is located in the TwoPhaseCommitSinkFunction.notifyCheckpointComplete() method. As the name suggests, this method will be called when all checkpoints are successful.

    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry =;
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            if (pendingTransactionCheckpointId > checkpointId) {

  "{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            try {
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);


        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",

As can be seen from the code, this method takes one transaction handle from the disaster relief waiting to be submitted at a time, checks its checkpoint ID, and calls the commit() method to submit. The flow chart of this stage is as follows:

It can be seen that the write will only succeed if all checkpoints are successful. This is in line with the flow of the 2PC described above. The jobmanager is the coordinator, each operator is the participant, and there is a sink participant who will execute the submission. Once there is a checkpoint failure, the notifyCheckpointComplete() method will not be executed if the retry is unsuccessful. Finally, the abort() method is called to roll back the transaction.

    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {


Points to be confirmed: specific code implementation logic (I feel that some parts are not clear)
1. How to submit data to kafk two-step submission request and execution
2. Specific code implementation

Posted by pacuran on Wed, 04 May 2022 09:46:47 +0300