flink checkpoint source code analysis II

Overview: the last article generally received the overall concept and process of source code analysis of flycheckpoint. Combined with the code, it introduces the initiation and task execution process of checkpoint
Detailed reference: https://blog.csdn.net/weixin_40809627/article/details/108537480

This article will follow the previous article and continue to introduce the processes of checkpoint snapshot, local state storage, checkpoint confirmation, and state recovery of flick checkpoint.

1, Storage checkpoint status snapshot

After the Task triggers the chckpoint, the most important thing for the Task is to save the state snapshot s of all operators in the current Task to the external storage system. The external system may be a distributed file system or in JobManager memory.

In streamtask The performcheckpoint method starts the checkpoint operation, which is mainly divided into three parts: 1) the preparation operation of the checkpoint, which usually does not carry out too many operations; 2) Send CheckpointBarrier; 3) Store checkpoint snapshots:

class StreamTask {
	private boolean performCheckpoint(
			CheckpointMetaData checkpointMetaData,
			CheckpointOptions checkpointOptions,
			CheckpointMetrics checkpointMetrics,
			boolean advanceToEndOfTime) throws Exception {
		final long checkpointId = checkpointMetaData.getCheckpointId();
		final boolean result;
		synchronized (lock) {
			if (isRunning) {
				if (checkpointOptions.getCheckpointType().isSynchronous()) {
					if (advanceToEndOfTime) {

				// All of the following steps happen as an atomic step from the perspective of barriers and
				// records/watermarks/timers/callbacks.
				// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
				// checkpoint alignments

				// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
				//           The pre-barrier work should be nothing or minimal in the common case.

				// Step (2): Send the checkpoint barrier downstream

				// Step (3): Take the state snapshot. This should be largely asynchronous, to not
				//           impact progress of the streaming topology
				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
				result = true;
			else {
				// we cannot perform our checkpoint - let the downstream operators know that they
				// should not wait for any input from this operator
				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
				// yet be created
				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
				Exception exception = null;
				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
					try {
					} catch (Exception e) {
						exception = ExceptionUtils.firstOrSuppressed(
							new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
				if (exception != null) {
					throw exception;
				result = false;
		if (isRunning && syncSavepointLatch.isSet()) {
			//Save the savepoint and wait for the checkpoint to confirm
			final boolean checkpointWasAcked =
			if (checkpointWasAcked) {
		return result;

Before introducing how to store checkpoint snapshots, first understand some classes related to checkpoint storage. In short, checkpoint storage is an abstraction of the state storage system. It has two different implementations: memorybackendcheckpoint storage and fscheckpoint storage. MemoryBackendCheckpointStorage will store the checkpoint status of all operators in the memory of JobManager, which is usually not suitable for use in production environment; FsCheckpointStorage will persist the checkpoint state of all operators in the file system. Checkpoint storagelocation is an abstraction of where checkpoint status is stored. The metadata of the stream can be written to the storage system through the check point and the output method. When the output stream is closed, the state handle can be obtained, and then the written state can be re read using the handle.

The following is the main logic for executing state snapshots

Each calculated snapshot is abstracted as operator snapshot futures, which contains the snapshot results of operator state and keyed state:
The process of checkpoint snapshot is encapsulated as CheckpointingOperation. Since each StreamTask may contain multiple operators, an ap is used internally to maintain the relationship between operatorid - > operatorsnapshotfutures. In the checkpointing operation, the snapshot operation is divided into two stages. The first stage is executed synchronously and the second stage is executed asynchronously.

class StreamTask {
	private static final class CheckpointingOperation {
		//OperatorID -> OperatorSnapshotFutures
		private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

		//Execute checkpoint snapshot
		public void executeCheckpointing() throws Exception {
			startSyncPartNano = System.nanoTime();

			try {
				//1. Synchronous execution
				for (StreamOperator<?> op : allOperators) {

				//2. Asynchronous execution
				// Checkpoints can be configured to execute synchronously or asynchronously
				// If it is executed synchronously, in fact, all runnable future s here are completed
				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
			} catch (Exception ex) {

		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
			if (null != op) {
				// Call streamoperator Snapshot state method
				// The returned result is runnable future, which may or may not have been executed
				OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
				operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);

In the synchronous execution phase, the streamoperator of each operator will be called in turn snapshotState. The returned result is a runnable future. According to the difference between synchronous mode and asynchronous mode, the future may be in the completed state or incomplete state: refer to the code snapshotState for details

Now that we have seen how the checkpoint operation is associated with user-defined functions, let's take a look at how the state managed by Flink is written to the storage system, that is:

operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //Write operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //Write keyed state

First, let's look at the operator state. DefaultOperatorStateBackend gives the actual work to DefaultOperatorStateBackendSnapshotStrategy. First, all currently registered operator states (including list state and broadcast state) will be deeply copied, and then the actual write operation will be encapsulated in an asynchronous FutureTask. The main tasks of this FutureTask include: 1), opening the output stream 2), writing the status metadata information 3), writing the status 4), closing the output stream and obtaining the status handle. If the asynchronous checkpoint mode is not started, the FutureTask will be executed immediately in the synchronization phase.

The basic process of keyed state writing is similar to this, but because there are many implementations of keyed state during storage, including different implementations based on heap memory and RocksDB, and the implementation based on RocksDB also supports incremental checkpoint, it is more complex than operator state. In addition, since version 1.5.0, Flink has also introduced an optimization of local state storage, which supports saving a keyed state locally in TaskManager, trying to optimize the speed of state recovery and network overhead.
So far, we will introduce the first stage of snapshot operation, that is, the stage of synchronous execution. The asynchronous execution phase is encapsulated as AsyncCheckpointRunnable. The main operations include 1) executing the synchronization phase to create FutureTask 2) sending an Ack response to the checkpoint coordinator after completion.

class StreamTask {
	protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
		public void run() {
			try {
				TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
					new TaskStateSnapshot(operatorSnapshotsInProgress.size());
				TaskStateSnapshot localTaskOperatorSubtaskStates =
					new TaskStateSnapshot(operatorSnapshotsInProgress.size());

				// Write the status of each operator
				// If it is a synchronous checkpoint, the status has been written before that
				// If it is an asynchronous checkpoint, the status will be written here
				for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
					OperatorID operatorID = entry.getKey();
					OperatorSnapshotFutures snapshotInProgress = entry.getValue();
					// finalize the async part of all by executing all snapshot runnables
					OperatorSnapshotFinalizer finalizedSnapshots =
						new OperatorSnapshotFinalizer(snapshotInProgress);



				final long asyncEndNanos = System.nanoTime();
				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;


				if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
					CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
					//Report snapshot complete

				} else {
					LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
			} catch (Exception e) {
			} finally {

	private void reportCompletedSnapshotStates(
			TaskStateSnapshot acknowledgedTaskStateSnapshot,
			TaskStateSnapshot localTaskStateSnapshot,
			long asyncDurationMillis) {
			TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
			boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
			boolean hasLocalState = localTaskStateSnapshot.hasState();
			// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
			// to stateless tasks on restore. This enables simple job modifications that only concern
			// stateless without the need to assign them uids to match their (always empty) states.
				hasAckState ? acknowledgedTaskStateSnapshot : null,
				hasLocalState ? localTaskStateSnapshot : null);

public class TaskStateManagerImpl implements TaskStateManager {
	public void reportTaskStateSnapshots(
		@Nonnull CheckpointMetaData checkpointMetaData,
		@Nonnull CheckpointMetrics checkpointMetrics,
		@Nullable TaskStateSnapshot acknowledgedState,
		@Nullable TaskStateSnapshot localState) {

		long checkpointId = checkpointMetaData.getCheckpointId();

		localStateStore.storeLocalState(checkpointId, localState);

		//Send ACK response to checkpoint Coordinator

2, Local state storage

The so-called local state storage is to store a copy in the TaskManager local file system where the Task is located when storing the checkpoint snapshot, so that it can be restored from the local state first during state recovery, so as to reduce the overhead of network data transmission. Local state storage is only for keyed state. Let's take the relatively simple HeapKeyedStateBackend as an example to see how to implement local state storage

class HeapSnapshotStrategy<K>
	extends AbstractSnapshotStrategy<KeyedStateHandle> implements SnapshotStrategySynchronicityBehavior<K> {
	public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
		long checkpointId,
		long timestamp,
		@Nonnull CheckpointStreamFactory primaryStreamFactory,
		@Nonnull CheckpointOptions checkpointOptions) throws IOException {


		//Create CheckpointStreamWithResultProvider
		final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =

			localRecoveryConfig.isLocalRecoveryEnabled() ?

				() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
					localRecoveryConfig.getLocalStateDirectoryProvider()) :

				() -> CheckpointStreamWithResultProvider.createSimpleStream(


The key point is to create a different state based on whether local state recovery is enabled or not

public interface CheckpointStreamWithResultProvider extends Closeable {
	static CheckpointStreamWithResultProvider createSimpleStream(
		@Nonnull CheckpointedStateScope checkpointedStateScope,
		@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {

		CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
		return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);

	static CheckpointStreamWithResultProvider createDuplicatingStream(
		@Nonnegative long checkpointId,
		@Nonnull CheckpointedStateScope checkpointedStateScope,
		@Nonnull CheckpointStreamFactory primaryStreamFactory,
		@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {

		CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =

		try {
			File outFile = new File(
			Path outPath = new Path(outFile.toURI());

			CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
				new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
			//There are two output streams, primary and secondary. Secondary corresponds to local storage
			return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);
		} catch (IOException secondaryEx) {
			LOG.warn("Exception when opening secondary/local checkpoint output stream. " +
				"Continue only with the primary stream.", secondaryEx);
		return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);

Therefore, when local state storage is enabled, two output streams will be created, where primaryOut corresponds to external storage and secondaryOut corresponds to local storage. The status will be output in two copies. The local state handle is stored in the TaskLocalStateStore.

3, Verification of Checkpoint

todo combined with source code analysis: https://blog.jrwang.me/2019/flink-source-code-checkpoint/

Tags: Big Data flink

Posted by freddyw on Sun, 15 May 2022 15:26:01 +0300