Flink heartbeat service mechanism

Heartbeat mechanism is a mechanism used to detect whether the client or server is alive. By regularly sending requests to each other, there are two common heartbeat detection methods:

  1. Socket socket SO_KEEPALIVE has its own heartbeat mechanism. It sends heartbeat packets to the other party regularly, and the other party will reply automatically after receiving the heartbeat packet;
  2. The application itself implements the heartbeat mechanism, which is also the way to send requests regularly;

Flink's monitoring of the service status of each component is managed by the heartbeat service. Like its appeal 2 implementation mechanism, it is mainly that the caller sends the heartbeat request periodically and regularly, and the receiver makes the corresponding heartbeat response after receiving the heartbeat request; Its internal implementation is to call each other through RPC and reset the scheduling of each other's timeout threads. In each service component of Flink, there is a heartbeat mechanism for mutual detection among ResourceManager, JobMaster and TaskExecutor: ResourceManager will actively send heartbeat request to detect whether JobMaster and TaskExecutor survive; The JobMaster will also actively send a heartbeat request to detect whether the task executor is alive for task restart or failure processing. In the Flink heartbeat mechanism, its main heartbeat communication core processing is as follows:

  1. Heartbeat timeout: after the heartbeat service is started, Flink will start a thread to handle the heartbeat timeout event. The thread will not be executed until the set heartbeat timeout time is reached. If the heartbeat message of the component is received, the thread will be cancelled first and then restarted to reset the trigger of the heartbeat timeout event. The heartbeat service depends on the HeartbeatListener. When no heartbeat response is received within the timeout time range, the timeout processing thread will be triggered. The thread will do subsequent heartbeat timeout processing operations (generally trying to reconnect) by calling the notifyHeartbeatTimeout method of the HeartbeatListener.
  2. Heartbeat request: heartbeat check is two-way. One party will actively initiate heartbeat request, while the other party will respond to heartbeat. They call each other through RPC to reset each other's timeout thread. Take JobManager and TaskManager as examples. When JM starts, it will start cycle scheduling, initiate heartbeat check to TM that has been registered in JM, and call TM's requestHeartbeat method through RPC to reset the call to JM timeout thread, indicating that the current JM state is normal. After TM receives the requestHeartbeat request method from JM, TM will call JM's receiveHeartbeat through RPC to reset the call to TM timeout thread, indicating that TM is in normal status.

The main interfaces and classes used by Flink central hop service are shown in the figure below:

HeartbeatTarget class:

Heartbeat core target class: it is mainly used to send heartbeat information and receive heartbeat response. Heartbeat sender and receiver are subclasses of this interface. Both can carry Payload load information.

public interface HeartbeatTarget<I> {
   /**
    * Sends a heartbeat response to the target. Each heartbeat response can carry a payload which
    * contains additional information for the heartbeat target.
    *
    * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.
    * @param heartbeatPayload Payload of the heartbeat. Null indicates an empty payload.
    */
   void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);  // Receive the heartbeat request response information sent by the monitoring target

   /**
    * Requests a heartbeat from the target. Each heartbeat request can carry a payload which
    * contains additional information for the heartbeat target.
    *
    * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
    * @param heartbeatPayload Payload of the heartbeat request. Null indicates an empty payload.
    */
   void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);   // Send heartbeat request to monitoring target
}

HeartbeatManager:

The heartbeat manager is used to start or stop monitoring HeartbeatTarget and report the heartbeat timeout event of the target. The HeartbeatTarget is passed and monitored through the monitorTarget. This method can be regarded as the input of the whole service and tell the heartbeat service which targets to manage.

public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
    // Start monitoring the heartbeat target. When the target heartbeat times out, it will be reported to the HeartbeatListener associated with the HeartbeatManager
    void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
    // Cancel monitoring heartbeat target. ResourceID is the identification of heartbeat target
    void unmonitorTarget(ResourceID resourceID);
    // Stop the current heartbeat Manager
    void stop();
    // Returns the last heartbeat time. If the heartbeat target is removed, - 1 is returned
    long getLastHeartbeatFrom(ResourceID resourceId);
}

HeartbeatListener:

The interface closely related to HeartbeatManager can be regarded as the output of the service. It mainly has the following functions:

  • Heartbeat timeout notification
  • Receive Payload in heartbeat message
  • Retrieves the Payload output as a heartbeat response
public interface HeartbeatListener<I, O> {
    // This method is called when the heartbeat times out
    void notifyHeartbeatTimeout(ResourceID resourceID);
    // This method is executed when the payload about heartbeat is received
    void reportPayload(ResourceID resourceID, I payload);
    // Retrieve the Payload of the next heartbeat message
    O retrievePayload(ResourceID resourceID);
}

Creation of heartbeat service:

Some services will be initialized when the cluster starts, and a heartbeat management service will be created in the ClusterEntrypoint#initializeServices method. It will extract heartbeat interval from the configuration file Interval and heartbeat timeout Timeout configuration and create HeartbeatServices;

heartbeatServices = createHeartbeatServices(configuration);

protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
  return HeartbeatServices.fromConfiguration(configuration);
}

public static HeartbeatServices fromConfiguration(Configuration configuration) {
    // Heartbeat interval, 10s by default
    long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
    // Heartbeat timeout, 50s
    long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

    return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

Core methods of createHeartbeatManager and createHeartbeatManagerSender:

The two classes HeartbeatManagerImpl and HeartbeatManagerSenderImpl used by these two methods are the key of the whole heartbeat service.

HeartbeatManagerImpl is created by heartbeat acceptor and responder (such as TM). It receives heartbeat sending requests from heartbeat initiator and requester (JM). It mainly contains two important attributes: heartbeatListener and heartbeatTargets. Heartbeat targets is a Map collection. key represents the ID of the heartbeat component (e.g. TM) to be sent, and value is the thread heartbeat monitor created for the current component to trigger heartbeat timeout. The two correspond one by one. Heartbeat timeout will trigger the notifyheartbeat timeout method of the corresponding heartbeat listener. Note: the startup of the heartbeat receiver monitoring thread of the initiated party is triggered after receiving the request heartbeat (after requestHeartbeat is called), which is a passive trigger.

//  The external caller passes the heartbeat target and creates a heartbeat monitor for it
public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
   if (!stopped) {
       if (heartbeatTargets.containsKey(resourceID)) {
           log.debug("The target with resource ID {} is already been monitored.", resourceID);
       } else {
           // Save the target monitoring processing core class heartbeatTarget in HeartbeatMonitor and associate it with the corresponding timeout processing listener heartbeatListener
           HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
               resourceID,
               heartbeatTarget,
               mainThreadExecutor,
               heartbeatListener,
               heartbeatTimeoutIntervalMs);

           heartbeatTargets.put(
               resourceID,
               heartbeatMonitor);

           // check if we have stopped in the meantime (concurrent stop operation)
           if (stopped) {
               heartbeatMonitor.cancel();
               heartbeatTargets.remove(resourceID);
           }
       }
   }
}

Heartbeat monitor manages the heartbeat target. If no heartbeat signal is received within the timeout time, it determines that the heartbeat has timed out and notifies the heartbeat listener. Each time a heartbeat signal is received, it resets the current timer.

static class HeartbeatMonitor<O> implements Runnable {
    private final ResourceID resourceID; /** Resource ID of the monitored heartbeat target. */
    private final HeartbeatTarget<O> heartbeatTarget; /** Associated heartbeat target. */
    private final ScheduledExecutor scheduledExecutor;
    private final HeartbeatListener<?, ?> heartbeatListener; /** Listener which is notified about heartbeat timeouts. */
    private final long heartbeatTimeoutIntervalMs; /** Maximum heartbeat timeout interval. */
    private volatile ScheduledFuture<?> futureTimeout;
    private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
    private volatile long lastHeartbeat;  //  Time of last heartbeat received

    HeartbeatMonitor(
        ResourceID resourceID,
        HeartbeatTarget<O> heartbeatTarget,
        ScheduledExecutor scheduledExecutor,
        HeartbeatListener<?, O> heartbeatListener,
        long heartbeatTimeoutIntervalMs) {
        this.resourceID = Preconditions.checkNotNull(resourceID); // Monitored machine ID
        this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget); // Heartbeat directory core processing class
        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
        this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener); // Heartbeat monitor

        Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout interval has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
        lastHeartbeat = 0L;
        resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
    }
    /....................
    // Report heartbeat
    void reportHeartbeat() { 
        lastHeartbeat = System.currentTimeMillis();  // Keep the last received heartbeat time
        resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);  // After receiving the heartbeat, reset the timeout thread
    }
    // Reset TIMEOUT
    void resetHeartbeatTimeout(long heartbeatTimeout) {
        if (state.get() == State.RUNNING) {
            cancelTimeout(); //First cancel the thread and restart it
            futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS); // Start timeout thread

            // Double check for concurrent accesses (e.g. a firing of the scheduled future)
            if (state.get() != State.RUNNING) {
                cancelTimeout();
            }
        }
    }
    /................
    // Heartbeat timeout, trigger notifyHeartbeatTimeout of listener
    @Override
    public void run() {
        // The heartbeat has timed out if we're in state running
        if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
            heartbeatListener.notifyHeartbeatTimeout(resourceID);
        }
    }
}

HeartbeatManagerSenderImpl is a subclass of HeartbeatManagerImpl. It is created by the heartbeat requester (such as JM). After creation, the cycle scheduling thread is started immediately. Each time it traverses the heartbeatTarget managed by itself, it triggers the heartbeatTarget Requestheartbeat() is an active heartbeat request.

this.heartbeatPeriod = heartbeatPeriod;
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);

public void run() {
   if (!stopped) {
      log.debug("Trigger heartbeat request.");
      for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
         CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); // Recreate current load information
         final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); // Heartbeat core processing class

         if (futurePayload != null) {
            CompletableFuture<Void> requestHeartbeatFuture = FutureUtils.thenAcceptAsyncIfNotDone(
               futurePayload,
               getMainThreadExecutor(),
               payload -> heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload)); // Use the heartbeat core processing class to send requests 

            requestHeartbeatFuture.exceptionally(
               (Throwable failure) -> {
                  log.warn("Could not request the heartbeat from target {}.", heartbeatTarget, failure);

                  return null;
               });
         } else {
            heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
         }
      }
      getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); // Periodic scheduling
   }
}

 

Heartbeat service example:

1. Heartbeat initiator and requester: used by HeartbeatManagerSenderImpl in JM

  1. After receiving the registration of TM, add the heartbeat target to the collection of heartbeat targets. The heartbeat initiating requester JM will use its heartbeat management transmitter, heartbeat managersenderimpl, to periodically traverse and schedule the heartbeat target managed by itself and trigger the heartbeat target Requestheartbeat() heartbeat request; This is the heartbeat target that triggers JM for TM requestHeartbeat().
  2. In requestHeartbeat, call taskexecutor #heartbeat fromjobmanager through RPC and send heartbeat request information from JM to TM; Finally, after TM receives the heartbeat request information, it will call requestHeartbeat in HeartbeatManagerImpl to start or reset the timeout thread, indicating that JM status is normal. In this method, JM's receiveHeartbeat is called through RPC.
public CompletableFuture<RegistrationResponse> registerTaskManager(
      final String taskManagerRpcAddress,
      final TaskManagerLocation taskManagerLocation,
      final Time timeout) {
   final ResourceID taskManagerId = taskManagerLocation.getResourceID();

   if (registeredTaskManagers.containsKey(taskManagerId)) {
      final RegistrationResponse response = new JMTMRegistrationSuccess(resourceId);
      return CompletableFuture.completedFuture(response);
   } else {
      return getRpcService()
         .connect(taskManagerRpcAddress, TaskExecutorGateway.class)
         .handleAsync(
            (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
               if (throwable != null) {
                  return new RegistrationResponse.Decline(throwable.getMessage());
               }
               
               slotPoolGateway.registerTaskManager(taskManagerId); // TaskManager registration
               registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));

               // monitor the task manager as heartbeat target / / add heartbeat target
               taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<AllocatedSlotReport>() {
                  @Override
                  public void receiveHeartbeat(ResourceID resourceID, AllocatedSlotReport payload) {
                     // the task manager will not request heartbeat, so this method will never be called currently
                  }

                  @Override
                  public void requestHeartbeat(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
                     taskExecutorGateway.heartbeatFromJobManager(resourceID, allocatedSlotReport);  // JM requires TM to send heartbeat request
                  }
               });

               return new JMTMRegistrationSuccess(resourceId);
            },
            getMainThreadExecutor());
   }
}

After TM receives JM's RPC heartbeat request, it will finally call the heartbeat acceptance processor HeartbeatManagerImpl#requestHeartbeat on TM to process the heartbeat request:

//HeartbeatManagerImpl#requestHeartbeat()
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
   if (!stopped) {
      log.debug("Received heartbeat request from {}.", requestOrigin);
      final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin); // Start the timeout thread and get the target heartbeat target. At this time, the target is JM

      if (heartbeatTarget != null) {
         if (heartbeatPayload != null) {
            heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); // The listener reports the load
         }
         CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin); // The listener generates current load information

         if (futurePayload != null) {
            CompletableFuture<Void> sendHeartbeatFuture = FutureUtils.thenAcceptAsyncIfNotDone(
               futurePayload,
               mainThreadExecutor,    // The heartbeat processing core target class calls the receiveHeartbeat of the heartbeat request initiator (JM) through RPC   
               retrievedPayload ->    heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload)); // The heartbeat target instance is registered by the following monitorTarget

            sendHeartbeatFuture.exceptionally((Throwable failure) -> {
                  log.warn("Could not send heartbeat to target with id {}.", requestOrigin, failure);
                  return null;
               });
         } else {
            heartbeatTarget.receiveHeartbeat(ownResourceID, null);
         }
      }
   }
}

//The heartbeat core processing class on TaskExecutor monitors and registers TaskExecutor#establishJobManagerConnection
// monitor the job manager as heartbeat target
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() {
   @Override
   public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
      jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
   }

   @Override
   public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
      // request heartbeat will never be called on the task manager side
   }
});

2. Heartbeat receiver in use: heartbeat Manager

  • After TM starts, it will establish a connection with JM. After the connection is successful, it will create a HeartbeatTarget for JM and rewrite the receiveHeartbeat method. At this time, the corresponding monitor thread has been created in HeartbeatManagerImpl. The execution of this thread will be triggered only after JM executes requestHeartbeat.
  • In the receiveHeartbeat method, call JM's heartbeatFromTaskManager method directly through RPC, and finally enter the HeartbeatManagerSenderImpl#receiveHeartbeat on the JM side. Reset the trigger of JM monitor thread in reportHeartbeat, which represents the normal execution of TM.
//TaskExecutor#establishJobManagerConnection
private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) {
    /.............
    ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
    // monitor the job manager as heartbeat target
    jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() {
        // TM only receives heartbeat requests. After receiving the request information from JM, it will call back jobmastergateway through RPC heartbeatFromTaskManager()
        @Override
        public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
            jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
        }

        @Override
        public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
            // request heartbeat will never be called on the task manager side
        }
    });
    /..............
}
// jobMaster heartbeat requestor instance 
// The creation of taskManagerHeartbeatManager HeartbeatManagerSenderImpl inherits from HeartbeatManagerImpl
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
    resourceId,
    new TaskManagerHeartbeatListener(),
    getMainThreadExecutor(),
    log);
    
// Heartbeat response received from TM
public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
    taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport); // 
}

// JM receives heartbeat response from TM
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {
    if (!stopped) {
        log.debug("Received heartbeat from {}.", heartbeatOrigin);
        //Operation after receiving heartbeat
        reportHeartbeat(heartbeatOrigin);
    
        if (heartbeatPayload != null) {
            heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
        }
    }
}

 

Tags: flink

Posted by ozone1 on Fri, 13 May 2022 07:01:12 +0300