MetadataCache refers to the metadata cache on the Broker, which is sent to the Broker by the Controller through the UpdateMetadataRequest request request. In other words, the Controller implements an asynchronous update mechanism, which can broadcast the latest cluster information to all brokers. Kafka ensures the final consistency of metadata cache on all brokers through the asynchronous update mechanism.
There are two reasons why the same data should be saved on each Broker.
- After saving this part of data, the Broker can respond to the metadata request sent by the client in time, that is, handle the metadata request. Metadata request is one of the few types of requests that can be processed by any Broker in the cluster. That is, the client program can send metadata requests to any Broker at will to obtain the metadata information of the cluster, which is fully due to the existence of MetadataCache.
- Some important components of Kafka will use this data. For example, the replica manager will use it to obtain the node information of the Broker, and the transaction manager will use it to obtain the information of the partition Leader replica, and so on.
MetadataCache is defined as follows:
class MetadataCache(brokerId: Int) extends Logging { // Protect the lock object it writes to private val partitionMetadataLock = new ReentrantReadWriteLock() //this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock) //replace the value with a completely new one. this means reads (which are not under any lock) need to grab //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. //multiple reads of this value risk getting different snapshots. // The actual metadata information is saved @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty) // For log output only this.logIdent = s"[MetadataCache brokerId=$brokerId] " pr
The metadata snapshot field above holds the actual metadata information, which is defined as follows
case class MetadataSnapshot(// This is a Map type. Key is the subject name, Value is a Map type, its key is the partition number, and Value is a field of UpdateMetadataPartitionState type. // The UpdateMetadataPartitionState type is the data structure required internally by the UpdateMetadataRequest request. partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], controllerId: Option[Int], // The ID of the Broker where the Controller resides. aliveBrokers: mutable.LongMap[Broker], // List of all surviving Broker objects in the current cluster. aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) // It is also a Map type of Map. The Key is the Broker ID serial number, the Value is the Map type, and the Key is the listener name, // That is, the Broker listener type, and Value is the Broker node object.
UpdateMetadataPartitionState is defined as follows:
static public class UpdateMetadataPartitionState implements Message { private String topicName; // Subject name private int partitionIndex; // Partition number private int controllerEpoch; // Controller Epoch value private int leader; // Broker ID of the Leader replica private int leaderEpoch; // Leader Epoch value private List<Integer> isr; // ISR list private int zkVersion; // Version number in Stat statistics of ZooKeeper node private List<Integer> replicas; // Copy list private List<Integer> offlineReplicas; // Offline replica list private List<RawTaggedField> _unknownTaggedFields; // Unknown field list }
Let's take a look at the important methods of the MetadataCache class. The most important method is to manipulate the metadata snapshot field
I roughly divide the methods of MetadataCache class into three categories: judgment class; Get class; Update class.
Judgment method
A method to judge whether a given topic or topic partition is included in the metadata cache, such as:
// Determine whether a given topic is included in the metadata cache def contains(topic: String): Boolean = { metadataSnapshot.partitionStates.contains(topic) }
// Determine whether the given topic is included in the metadata partition def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
Get class method
// Gets the detailed data information of the given topic partition. If no corresponding record is found, return None def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = { metadataSnapshot.partitionStates.get(topic).flatMap(_.get(partitionId)) }
// Returns all topics in the current cluster metadata cache. private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = { snapshot.partitionStates.keySet }
//The parameters are subject partition and ListenerName to get the Broker node object of all copies of the subject partition under the specified listener type def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = { // Get the current metadata cache using local variables val snapshot = metadataSnapshot // Gets the data of the given topic partition snapshot.partitionStates.get(tp.topic).flatMap(_.get(tp.partition)).map { partitionInfo => val replicaIds = partitionInfo.replicas replicaIds.asScala .map(replicaId => replicaId.intValue() -> { // Get the Broker Id of the replica snapshot.aliveBrokers.get(replicaId.longValue()) match { case Some(broker) => // Get the corresponding Broker node object according to the Broker Id broker.getNode(listenerName).getOrElse(Node.noNode()) case None => // If the node cannot be found Node.noNode() }}).toMap .filter(pair => pair match { case (_, node) => !node.isEmpty }) }.getOrElse(Map.empty[Int, Node]) }
Update class method
The main logic of the updateMetadata method is to read the partition data in the UpdateMetadataRequest request request, and then update the local metadata cache.
// Update the status information of each partition based on the UpdateMetadataRequest request request, and return the collection of partitions that need to be removed def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { inWriteLock(partitionMetadataLock) { // The first part is to prepare data for the following operations, that is, the data saved in the two fields of livebrokers and livenodes. // Save the surviving Broker object. Key is the Broker ID and Value is the Broker object val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) // Save the surviving node object. Key is the Broker ID and Value is the listener - > node object val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) // Get the Broker ID of the Controller from UpdateMetadataRequest // If there is no Controller currently, the value is assigned to None val controllerId = updateMetadataRequest.controllerId match { case id if id < 0 => None case id => Some(id) } // Traverse all surviving Broker objects in the UpdateMetadataRequest request updateMetadataRequest.liveBrokers.asScala.foreach { broker => // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could // move to `AnyRefMap`, which has comparable performance. val nodes = new java.util.HashMap[ListenerName, Node] val endPoints = new mutable.ArrayBuffer[EndPoint] // Traverse all its EndPoint types, that is, the listeners configured for the Broker broker.endpoints.asScala.foreach { ep => val listenerName = new ListenerName(ep.listener) endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) // Save the < listener, Broker node object > pair nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) } // Add a Broker to the collection of surviving Broker objects aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) // Add the Broker node to the collection of surviving node objects aliveNodes(broker.id) = nodes.asScala } // Ensure that the cluster Broker is configured with the same listener, initialize the deleted partition array object, and wait for the next part of the code logic to operate on it. // Using the surviving Broker node object in the previous section, // Get all < listener, node > pairs of the current Broker aliveNodes.get(brokerId).foreach { listenerMap => val listeners = listenerMap.keySet // If you find that the listener configured by the current Broker is different from other brokers, record the error log if (!aliveNodes.values.forall(_.keySet == listeners)) error(s"Listeners are not identical across brokers: $aliveNodes") } // Construct a deleted partition array as the result returned by the method val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] // The metadata update request does not carry any information if (!updateMetadataRequest.partitionStates.iterator.hasNext) { // Construct a new MetadataSnapshot object and use the previous partition information and the new Broker list information metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes) } else { // Part 3: extract the data in the UpdateMetadataRequest request request, and then fill the metadata cache //since kafka may do partial metadata updates, we start by copying the previous state // Backup partition data in existing metadata cache val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size) copy ++= oldPartitionStates partitionStates += (topic -> copy) } updateMetadataRequest.partitionStates.asScala.foreach { info => val controllerId = updateMetadataRequest.controllerId val controllerEpoch = updateMetadataRequest.controllerEpoch val tp = new TopicPartition(info.topicName, info.partitionIndex) // If the partition is in the process of being deleted if (info.leader == LeaderAndIsr.LeaderDuringDelete) { // Remove partition from metadata cache removePartitionInfo(partitionStates, tp.topic, tp.partition) stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") // Add partition to returned result data deletedPartitions += tp } else { // Add partition to metadata cache addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") } } // Use the updated partition metadata and the surviving Broker list and node list calculated in the first part to build the latest metadata cache metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) } // Returns an array of deleted partition lists deletedPartitions } }