Heartbeat mechanism is a mechanism used to detect whether the client or server is alive. By regularly sending requests to each other, there are two common heartbeat detection methods:
- Socket socket SO_KEEPALIVE has its own heartbeat mechanism. It sends heartbeat packets to the other party regularly, and the other party will reply automatically after receiving the heartbeat packet;
- The application itself implements the heartbeat mechanism, which is also the way to send requests regularly;
Flink's monitoring of the service status of each component is managed by the heartbeat service. Like its appeal 2 implementation mechanism, it is mainly that the caller sends the heartbeat request periodically and regularly, and the receiver makes the corresponding heartbeat response after receiving the heartbeat request; Its internal implementation is to call each other through RPC and reset the scheduling of each other's timeout threads. In each service component of Flink, there is a heartbeat mechanism for mutual detection among ResourceManager, JobMaster and TaskExecutor: ResourceManager will actively send heartbeat request to detect whether JobMaster and TaskExecutor survive; The JobMaster will also actively send a heartbeat request to detect whether the task executor is alive for task restart or failure processing. In the Flink heartbeat mechanism, its main heartbeat communication core processing is as follows:
- Heartbeat timeout: after the heartbeat service is started, Flink will start a thread to handle the heartbeat timeout event. The thread will not be executed until the set heartbeat timeout time is reached. If the heartbeat message of the component is received, the thread will be cancelled first and then restarted to reset the trigger of the heartbeat timeout event. The heartbeat service depends on the HeartbeatListener. When no heartbeat response is received within the timeout time range, the timeout processing thread will be triggered. The thread will do subsequent heartbeat timeout processing operations (generally trying to reconnect) by calling the notifyHeartbeatTimeout method of the HeartbeatListener.
- Heartbeat request: heartbeat check is two-way. One party will actively initiate heartbeat request, while the other party will respond to heartbeat. They call each other through RPC to reset each other's timeout thread. Take JobManager and TaskManager as examples. When JM starts, it will start cycle scheduling, initiate heartbeat check to TM that has been registered in JM, and call TM's requestHeartbeat method through RPC to reset the call to JM timeout thread, indicating that the current JM state is normal. After TM receives the requestHeartbeat request method from JM, TM will call JM's receiveHeartbeat through RPC to reset the call to TM timeout thread, indicating that TM is in normal status.
The main interfaces and classes used by Flink central hop service are shown in the figure below:
HeartbeatTarget class:
Heartbeat core target class: it is mainly used to send heartbeat information and receive heartbeat response. Heartbeat sender and receiver are subclasses of this interface. Both can carry Payload load information.
public interface HeartbeatTarget<I> { /** * Sends a heartbeat response to the target. Each heartbeat response can carry a payload which * contains additional information for the heartbeat target. * * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported. * @param heartbeatPayload Payload of the heartbeat. Null indicates an empty payload. */ void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload); // Receive the heartbeat request response information sent by the monitoring target /** * Requests a heartbeat from the target. Each heartbeat request can carry a payload which * contains additional information for the heartbeat target. * * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request. * @param heartbeatPayload Payload of the heartbeat request. Null indicates an empty payload. */ void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload); // Send heartbeat request to monitoring target }
HeartbeatManager:
The heartbeat manager is used to start or stop monitoring HeartbeatTarget and report the heartbeat timeout event of the target. The HeartbeatTarget is passed and monitored through the monitorTarget. This method can be regarded as the input of the whole service and tell the heartbeat service which targets to manage.
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> { // Start monitoring the heartbeat target. When the target heartbeat times out, it will be reported to the HeartbeatListener associated with the HeartbeatManager void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget); // Cancel monitoring heartbeat target. ResourceID is the identification of heartbeat target void unmonitorTarget(ResourceID resourceID); // Stop the current heartbeat Manager void stop(); // Returns the last heartbeat time. If the heartbeat target is removed, - 1 is returned long getLastHeartbeatFrom(ResourceID resourceId); }
HeartbeatListener:
The interface closely related to HeartbeatManager can be regarded as the output of the service. It mainly has the following functions:
- Heartbeat timeout notification
- Receive Payload in heartbeat message
- Retrieves the Payload output as a heartbeat response
public interface HeartbeatListener<I, O> { // This method is called when the heartbeat times out void notifyHeartbeatTimeout(ResourceID resourceID); // This method is executed when the payload about heartbeat is received void reportPayload(ResourceID resourceID, I payload); // Retrieve the Payload of the next heartbeat message O retrievePayload(ResourceID resourceID); }
Creation of heartbeat service:
Some services will be initialized when the cluster starts, and a heartbeat management service will be created in the ClusterEntrypoint#initializeServices method. It will extract heartbeat interval from the configuration file Interval and heartbeat timeout Timeout configuration and create HeartbeatServices;
heartbeatServices = createHeartbeatServices(configuration); protected HeartbeatServices createHeartbeatServices(Configuration configuration) { return HeartbeatServices.fromConfiguration(configuration); } public static HeartbeatServices fromConfiguration(Configuration configuration) { // Heartbeat interval, 10s by default long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); // Heartbeat timeout, 50s long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); }
Core methods of createHeartbeatManager and createHeartbeatManagerSender:
The two classes HeartbeatManagerImpl and HeartbeatManagerSenderImpl used by these two methods are the key of the whole heartbeat service.
HeartbeatManagerImpl is created by heartbeat acceptor and responder (such as TM). It receives heartbeat sending requests from heartbeat initiator and requester (JM). It mainly contains two important attributes: heartbeatListener and heartbeatTargets. Heartbeat targets is a Map collection. key represents the ID of the heartbeat component (e.g. TM) to be sent, and value is the thread heartbeat monitor created for the current component to trigger heartbeat timeout. The two correspond one by one. Heartbeat timeout will trigger the notifyheartbeat timeout method of the corresponding heartbeat listener. Note: the startup of the heartbeat receiver monitoring thread of the initiated party is triggered after receiving the request heartbeat (after requestHeartbeat is called), which is a passive trigger.
// The external caller passes the heartbeat target and creates a heartbeat monitor for it public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) { if (!stopped) { if (heartbeatTargets.containsKey(resourceID)) { log.debug("The target with resource ID {} is already been monitored.", resourceID); } else { // Save the target monitoring processing core class heartbeatTarget in HeartbeatMonitor and associate it with the corresponding timeout processing listener heartbeatListener HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>( resourceID, heartbeatTarget, mainThreadExecutor, heartbeatListener, heartbeatTimeoutIntervalMs); heartbeatTargets.put( resourceID, heartbeatMonitor); // check if we have stopped in the meantime (concurrent stop operation) if (stopped) { heartbeatMonitor.cancel(); heartbeatTargets.remove(resourceID); } } } }
Heartbeat monitor manages the heartbeat target. If no heartbeat signal is received within the timeout time, it determines that the heartbeat has timed out and notifies the heartbeat listener. Each time a heartbeat signal is received, it resets the current timer.
static class HeartbeatMonitor<O> implements Runnable { private final ResourceID resourceID; /** Resource ID of the monitored heartbeat target. */ private final HeartbeatTarget<O> heartbeatTarget; /** Associated heartbeat target. */ private final ScheduledExecutor scheduledExecutor; private final HeartbeatListener<?, ?> heartbeatListener; /** Listener which is notified about heartbeat timeouts. */ private final long heartbeatTimeoutIntervalMs; /** Maximum heartbeat timeout interval. */ private volatile ScheduledFuture<?> futureTimeout; private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING); private volatile long lastHeartbeat; // Time of last heartbeat received HeartbeatMonitor( ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor scheduledExecutor, HeartbeatListener<?, O> heartbeatListener, long heartbeatTimeoutIntervalMs) { this.resourceID = Preconditions.checkNotNull(resourceID); // Monitored machine ID this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget); // Heartbeat directory core processing class this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); // Heartbeat monitor Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout interval has to be larger than 0."); this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs; lastHeartbeat = 0L; resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); } /.................... // Report heartbeat void reportHeartbeat() { lastHeartbeat = System.currentTimeMillis(); // Keep the last received heartbeat time resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); // After receiving the heartbeat, reset the timeout thread } // Reset TIMEOUT void resetHeartbeatTimeout(long heartbeatTimeout) { if (state.get() == State.RUNNING) { cancelTimeout(); //First cancel the thread and restart it futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS); // Start timeout thread // Double check for concurrent accesses (e.g. a firing of the scheduled future) if (state.get() != State.RUNNING) { cancelTimeout(); } } } /................ // Heartbeat timeout, trigger notifyHeartbeatTimeout of listener @Override public void run() { // The heartbeat has timed out if we're in state running if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) { heartbeatListener.notifyHeartbeatTimeout(resourceID); } } }
HeartbeatManagerSenderImpl is a subclass of HeartbeatManagerImpl. It is created by the heartbeat requester (such as JM). After creation, the cycle scheduling thread is started immediately. Each time it traverses the heartbeatTarget managed by itself, it triggers the heartbeatTarget Requestheartbeat() is an active heartbeat request.
this.heartbeatPeriod = heartbeatPeriod; mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS); public void run() { if (!stopped) { log.debug("Trigger heartbeat request."); for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) { CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); // Recreate current load information final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); // Heartbeat core processing class if (futurePayload != null) { CompletableFuture<Void> requestHeartbeatFuture = FutureUtils.thenAcceptAsyncIfNotDone( futurePayload, getMainThreadExecutor(), payload -> heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload)); // Use the heartbeat core processing class to send requests requestHeartbeatFuture.exceptionally( (Throwable failure) -> { log.warn("Could not request the heartbeat from target {}.", heartbeatTarget, failure); return null; }); } else { heartbeatTarget.requestHeartbeat(getOwnResourceID(), null); } } getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); // Periodic scheduling } }
Heartbeat service example:
1. Heartbeat initiator and requester: used by HeartbeatManagerSenderImpl in JM
- After receiving the registration of TM, add the heartbeat target to the collection of heartbeat targets. The heartbeat initiating requester JM will use its heartbeat management transmitter, heartbeat managersenderimpl, to periodically traverse and schedule the heartbeat target managed by itself and trigger the heartbeat target Requestheartbeat() heartbeat request; This is the heartbeat target that triggers JM for TM requestHeartbeat().
- In requestHeartbeat, call taskexecutor #heartbeat fromjobmanager through RPC and send heartbeat request information from JM to TM; Finally, after TM receives the heartbeat request information, it will call requestHeartbeat in HeartbeatManagerImpl to start or reset the timeout thread, indicating that JM status is normal. In this method, JM's receiveHeartbeat is called through RPC.
public CompletableFuture<RegistrationResponse> registerTaskManager( final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, final Time timeout) { final ResourceID taskManagerId = taskManagerLocation.getResourceID(); if (registeredTaskManagers.containsKey(taskManagerId)) { final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId); return CompletableFuture.completedFuture(response); } else { return getRpcService() .connect(taskManagerRpcAddress, TaskExecutorGateway.class) .handleAsync( (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); } slotPoolGateway.registerTaskManager(taskManagerId); // TaskManager registration registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); // monitor the task manager as heartbeat target / / add heartbeat target taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<AllocatedSlotReport>() { @Override public void receiveHeartbeat(ResourceID resourceID, AllocatedSlotReport payload) { // the task manager will not request heartbeat, so this method will never be called currently } @Override public void requestHeartbeat(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) { taskExecutorGateway.heartbeatFromJobManager(resourceID, allocatedSlotReport); // JM requires TM to send heartbeat request } }); return new JMTMRegistrationSuccess(resourceId); }, getMainThreadExecutor()); } }
After TM receives JM's RPC heartbeat request, it will finally call the heartbeat acceptance processor HeartbeatManagerImpl#requestHeartbeat on TM to process the heartbeat request:
//HeartbeatManagerImpl#requestHeartbeat() public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat request from {}.", requestOrigin); final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin); // Start the timeout thread and get the target heartbeat target. At this time, the target is JM if (heartbeatTarget != null) { if (heartbeatPayload != null) { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); // The listener reports the load } CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin); // The listener generates current load information if (futurePayload != null) { CompletableFuture<Void> sendHeartbeatFuture = FutureUtils.thenAcceptAsyncIfNotDone( futurePayload, mainThreadExecutor, // The heartbeat processing core target class calls the receiveHeartbeat of the heartbeat request initiator (JM) through RPC retrievedPayload -> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload)); // The heartbeat target instance is registered by the following monitorTarget sendHeartbeatFuture.exceptionally((Throwable failure) -> { log.warn("Could not send heartbeat to target with id {}.", requestOrigin, failure); return null; }); } else { heartbeatTarget.receiveHeartbeat(ownResourceID, null); } } } } //The heartbeat core processing class on TaskExecutor monitors and registers TaskExecutor#establishJobManagerConnection // monitor the job manager as heartbeat target jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() { @Override public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) { jobMasterGateway.heartbeatFromTaskManager(resourceID, payload); } @Override public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) { // request heartbeat will never be called on the task manager side } });
2. Heartbeat receiver in use: heartbeat Manager
- After TM starts, it will establish a connection with JM. After the connection is successful, it will create a HeartbeatTarget for JM and rewrite the receiveHeartbeat method. At this time, the corresponding monitor thread has been created in HeartbeatManagerImpl. The execution of this thread will be triggered only after JM executes requestHeartbeat.
- In the receiveHeartbeat method, call JM's heartbeatFromTaskManager method directly through RPC, and finally enter the HeartbeatManagerSenderImpl#receiveHeartbeat on the JM side. Reset the trigger of JM monitor thread in reportHeartbeat, which represents the normal execution of TM.
//TaskExecutor#establishJobManagerConnection private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) { /............. ResourceID jobManagerResourceID = registrationSuccess.getResourceID(); // monitor the job manager as heartbeat target jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() { // TM only receives heartbeat requests. After receiving the request information from JM, it will call back jobmastergateway through RPC heartbeatFromTaskManager() @Override public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) { jobMasterGateway.heartbeatFromTaskManager(resourceID, payload); } @Override public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) { // request heartbeat will never be called on the task manager side } }); /.............. }
// jobMaster heartbeat requestor instance // The creation of taskManagerHeartbeatManager HeartbeatManagerSenderImpl inherits from HeartbeatManagerImpl taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); // Heartbeat response received from TM public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) { taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport); // } // JM receives heartbeat response from TM public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat from {}.", heartbeatOrigin); //Operation after receiving heartbeat reportHeartbeat(heartbeatOrigin); if (heartbeatPayload != null) { heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload); } } }