Configuration and use of flick checkpoint s


The previous article talked about the stateful operator of Flink. Stateful operator means that the intermediate results generated by the operator are stored in some memory data structures of Flink, such as ValueState, MapState and so on. This can be regarded as a computer system. For example, if a field in the flow is accumulated, we must save the intermediate result of the accumulation, and then we can know who to add in the next event.

But it is still a long way from disaster recovery. For example, if the job fails, the program is gone, and there is no state data in memory. This is when the checkpoint is needed. Therefore, checkpoints are a mechanism for periodically saving the status of tasks. But where is the state saved? The back-end State mentioned before.

Checkpoints are used for error recovery, but this is conditional:

  • On the one hand, the data source must be replay able. There are two main types: one is the message queue represented by kafka, and the other is the file system such as hdfs (non distributed file system is also possible of course)
  • On the other hand, it is a system that can persist the stored state, mainly a distributed file system.

Detailed explanation of checkpoint configuration items

package it.kenn.checkpoint;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

 * checkpoint test
public class CKDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //The parameters are the time when the checkpoint is generated and the CheckpointingMode. The default is actual_ Once is generally used by default. If the task has ultra-low delay requirements, it can be used at least once
        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
        //The following methods can also be used to set checkpoint mode
        //Set the timeout time. The timeout time means that the checkpoint is set, but it is considered to have timed out if it has not been completed in the past 500ms, and then terminate the checkpoint, that is, the checkpoint is no longer used
        /**Sets the minimum interval between checkpoint attempts. This setting defines how long the checkpoint coordinator can trigger another checkpoint after it may trigger another checkpoint (relative to the maximum number of concurrent checkpoints)
         * For example, in the following settings, the next checkpoint will be started within 5s after the previous checkpoint is completed
         * This parameter conflicts with the checkpoint generation interval. This parameter does not consider the checkpoint duration and checkpoint interval mentioned above. In other words, after setting the following parameters, the checkpoint will automatically become 5s,
         * If you think 5s is not enough, you can set the interval a little larger, but not less than 5s
         * What is the difference between setting the interval between checkpoints and setting the interval between checkpoints?
         * interval Is the interval of physical time, that is, as long as 1s has passed, a checkpoint will be generated. However, setting the interval between checkpoints means that the interval will be set when the checkpoint is completed for 1s. This is related to the completion time of the checkpoint
         * For example, the system for storing checkpoints is relatively slow. The average time for completing a checkpoint is 10s, and then the interval between checkpoints is set to 5S, so the time interval between two checkpoints is 15s
         * To put it more simply, interval sets the interval between the generation time of two checkpoints, and the following parameters set the interval between the end of the first checkpoint and the creation (not yet finished) of the second checkpoint
        //Sets the maximum concurrent number of checkpoints For example, if it is set to 1, it means that there can only be one checkpoint. When this checkpoint is completed, it is possible to generate the next checkpoint. This is also the default parameter. If the above settings are defined, this setting item cannot be defined
        //Set external checkpoints. The metadata information of the checkpoint can be written to the external system regularly, so that the checkpoint will not be cleared when the job fails. In this way, if the job fails, the job can be recovered from the checkpoint. This is very important and will be explained in detail in the following section
        //Both checkpoints and savepoints can be used for disaster recovery. This configuration means that if there is a more recent savepoint, I don't need it, but prefer to use checkpoints for disaster recovery
        //Sets the use of non aligned checkpoints. It can greatly reduce the time of checkpoints in case of back pressure, but it can only be used when there are accurate checkpoints at one time and the maximum number of concurrent checkpoints allowed is 1



Checkpoint save configuration

By default, the state is saved in TM's memory, while the checkpoint is saved in JM's memory. Checkpoints are prepared for disaster recovery of the job task of the application, that is, if the application is still running, but a job fails, it can be recovered through checkpoints. However, if the application is cancelled or the application is down, the checkpoint data will not exist (because it exists in JM, there should be no directors, and there must be no data in memory). If you want to apply downtime and the checkpoint still exists, you need to set up and save a copy of the checkpoint in the external system.

The externalized checkpoint cleanup mode configures how checkpoints operate when the application is cancelled.

  • ExternalizedCheckpointCleanup. RETAIN_ ON_ Cancelation means that canceling a job is to retain checkpoints. The cancellation here is normal cancellation, not task failure. If the task is restarted, the checkpoint will not be cleared automatically. If it needs to be cleared, it needs to be cleared manually.
  • ExternalizedCheckpointCleanup. DELETE_ ON_ Cancelation means that the checkpoint will be deleted when the job is cancelled. If the task fails, the checkpoint will not be deleted. That is, if the task fails, the task can be recovered from the checkpoint.

Checkpoint directory structure

The checkpoint saving directory can be configured globally in the file flink-conf.yml or individually for an application

# The configuration in yml is global, that is, every application uses this path
state.checkpoints.dir: hdfs:///checkpoints/
//Configuring an application in the code is actually setting the state backend
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));

In the configured directory, divide it into directories by jobid, and store all checkpoints of the application in the directory corresponding to each jobid.

Restore application from checkpoint

Checkpoint data has been persisted to the distributed file system. If you want to restore the state when restarting the application, you can execute the following command

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]


  • Checkpoints are used to store state s. By default, they are saved in JM. If a task is cancelled or fails, the checkpoint is lost
  • Checkpoints can be saved in an external distributed file system by setting the external storage option for checkpoints.
  • If it is used for disaster recovery, it is not enough to only save checkpoints in an external system. The data that needs to be processed by flink comes from replayable data sources, such as message queues and various file systems
  • Restart the task to recover from the checkpoint. You need to specify the storage path of the checkpoint









Tags: flink

Posted by bjblackmore on Wed, 04 May 2022 21:24:45 +0300