Kafka replica management module: metadata cache management on Broker

MetadataCache refers to the metadata cache on the Broker, which is sent to the Broker by the Controller through the UpdateMetadataRequest request request. In other words, the Controller implements an asynchronous update mechanism, which can broadcast the latest cluster information to all brokers. Kafka ensures the final consistency of metadata cache on all brokers through the asynchronous update mechanism.

There are two reasons why the same data should be saved on each Broker.

  • After saving this part of data, the Broker can respond to the metadata request sent by the client in time, that is, handle the metadata request. Metadata request is one of the few types of requests that can be processed by any Broker in the cluster. That is, the client program can send metadata requests to any Broker at will to obtain the metadata information of the cluster, which is fully due to the existence of MetadataCache.
  • Some important components of Kafka will use this data. For example, the replica manager will use it to obtain the node information of the Broker, and the transaction manager will use it to obtain the information of the partition Leader replica, and so on.

MetadataCache is defined as follows:

class MetadataCache(brokerId: Int) extends Logging {
  // Protect the lock object it writes to
  private val partitionMetadataLock = new ReentrantReadWriteLock()
  //this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock)
  //replace the value with a completely new one. this means reads (which are not under any lock) need to grab
  //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
  //multiple reads of this value risk getting different snapshots.
  // The actual metadata information is saved
  @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
    controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)
  // For log output only
  this.logIdent = s"[MetadataCache brokerId=$brokerId] "

The metadata snapshot field above holds the actual metadata information, which is defined as follows

  case class MetadataSnapshot(// This is a Map type. Key is the subject name, Value is a Map type, its key is the partition number, and Value is a field of UpdateMetadataPartitionState type.
                              // The UpdateMetadataPartitionState type is the data structure required internally by the UpdateMetadataRequest request.
                              partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
                              controllerId: Option[Int], // The ID of the Broker where the Controller resides.
                              aliveBrokers: mutable.LongMap[Broker], // List of all surviving Broker objects in the current cluster.
                              aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) // It is also a Map type of Map. The Key is the Broker ID serial number, the Value is the Map type, and the Key is the listener name,
                              // That is, the Broker listener type, and Value is the Broker node object.

UpdateMetadataPartitionState is defined as follows:

static public class UpdateMetadataPartitionState implements Message {
    private String topicName;     // Subject name
    private int partitionIndex;   // Partition number
    private int controllerEpoch;  // Controller Epoch value
    private int leader;           // Broker ID of the Leader replica
    private int leaderEpoch;      // Leader Epoch value
    private List<Integer> isr;    // ISR list
    private int zkVersion;        // Version number in Stat statistics of ZooKeeper node
    private List<Integer> replicas;  // Copy list
    private List<Integer> offlineReplicas;  // Offline replica list
    private List<RawTaggedField> _unknownTaggedFields; // Unknown field list

Let's take a look at the important methods of the MetadataCache class. The most important method is to manipulate the metadata snapshot field
I roughly divide the methods of MetadataCache class into three categories: judgment class; Get class; Update class.

Judgment method

A method to judge whether a given topic or topic partition is included in the metadata cache, such as:

  // Determine whether a given topic is included in the metadata cache
  def contains(topic: String): Boolean = {
  // Determine whether the given topic is included in the metadata partition
  def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined

Get class method

  // Gets the detailed data information of the given topic partition. If no corresponding record is found, return None
  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
  // Returns all topics in the current cluster metadata cache.
  private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
  //The parameters are subject partition and ListenerName to get the Broker node object of all copies of the subject partition under the specified listener type
    def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
    // Get the current metadata cache using local variables
    val snapshot = metadataSnapshot
    // Gets the data of the given topic partition
    snapshot.partitionStates.get(tp.topic).flatMap(_.get(tp.partition)).map { partitionInfo =>
      val replicaIds = partitionInfo.replicas
        .map(replicaId => replicaId.intValue() -> {
          // Get the Broker Id of the replica
          snapshot.aliveBrokers.get(replicaId.longValue()) match {
            case Some(broker) =>
              // Get the corresponding Broker node object according to the Broker Id
            case None =>
              // If the node cannot be found
        .filter(pair => pair match {
          case (_, node) => !node.isEmpty
    }.getOrElse(Map.empty[Int, Node])

Update class method

The main logic of the updateMetadata method is to read the partition data in the UpdateMetadataRequest request request, and then update the local metadata cache.

  // Update the status information of each partition based on the UpdateMetadataRequest request request, and return the collection of partitions that need to be removed
  def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
    inWriteLock(partitionMetadataLock) {
      // The first part is to prepare data for the following operations, that is, the data saved in the two fields of livebrokers and livenodes.
      // Save the surviving Broker object. Key is the Broker ID and Value is the Broker object
      val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
      // Save the surviving node object. Key is the Broker ID and Value is the listener - > node object
      val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
      // Get the Broker ID of the Controller from UpdateMetadataRequest
      // If there is no Controller currently, the value is assigned to None
      val controllerId = updateMetadataRequest.controllerId match {
          case id if id < 0 => None
          case id => Some(id)
      // Traverse all surviving Broker objects in the UpdateMetadataRequest request
      updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
        // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which
        // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could
        // move to `AnyRefMap`, which has comparable performance.
        val nodes = new java.util.HashMap[ListenerName, Node]
        val endPoints = new mutable.ArrayBuffer[EndPoint]
        // Traverse all its EndPoint types, that is, the listeners configured for the Broker
        broker.endpoints.asScala.foreach { ep =>
          val listenerName = new ListenerName(ep.listener)
          endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol))
          // Save the < listener, Broker node object > pair
          nodes.put(listenerName, new Node(broker.id, ep.host, ep.port))
        // Add a Broker to the collection of surviving Broker objects
        aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
        // Add the Broker node to the collection of surviving node objects
        aliveNodes(broker.id) = nodes.asScala
      // Ensure that the cluster Broker is configured with the same listener, initialize the deleted partition array object, and wait for the next part of the code logic to operate on it.
      // Using the surviving Broker node object in the previous section,
      // Get all < listener, node > pairs of the current Broker
      aliveNodes.get(brokerId).foreach { listenerMap =>
        val listeners = listenerMap.keySet
        // If you find that the listener configured by the current Broker is different from other brokers, record the error log
        if (!aliveNodes.values.forall(_.keySet == listeners))
          error(s"Listeners are not identical across brokers: $aliveNodes")
      // Construct a deleted partition array as the result returned by the method
      val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
      // The metadata update request does not carry any information
      if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
        // Construct a new MetadataSnapshot object and use the previous partition information and the new Broker list information
        metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes)
      } else {
        // Part 3: extract the data in the UpdateMetadataRequest request request, and then fill the metadata cache
        //since kafka may do partial metadata updates, we start by copying the previous state
        // Backup partition data in existing metadata cache
        val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
        metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
          val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
          copy ++= oldPartitionStates
          partitionStates += (topic -> copy)
        updateMetadataRequest.partitionStates.asScala.foreach { info =>
          val controllerId = updateMetadataRequest.controllerId
          val controllerEpoch = updateMetadataRequest.controllerEpoch
          val tp = new TopicPartition(info.topicName, info.partitionIndex)
          // If the partition is in the process of being deleted
          if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
            // Remove partition from metadata cache
            removePartitionInfo(partitionStates, tp.topic, tp.partition)
            stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
              s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
            // Add partition to returned result data
            deletedPartitions += tp
          } else {
            // Add partition to metadata cache
            addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
            stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
              s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
        // Use the updated partition metadata and the surviving Broker list and node list calculated in the first part to build the latest metadata cache
        metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
      // Returns an array of deleted partition lists


Posted by gabeanderson on Fri, 13 May 2022 13:52:38 +0300