Hadoop3.2.1 [HDFS] source code analysis: Standby Namenode analysis

I preface

In the same HA HDFS cluster, two Namenode instances will run simultaneously, one of which is Active Namenode, which is used to process all client requests in real time; The other is Standby Namenode. The namespace of Standby Namenode is completely consistent with that of ActiveNamenode. Therefore, when the ActiveNamenode fails, the Standby Namenode can immediately switch to the Active state.

II checkpoint operation

In order to keep the namespace of Standby Namenode synchronized with Active Namenode, they will all communicate with the JournalNodes daemon.

When Active Namenode performs any operation to modify the namespace, it needs to persist the generated editlog file to at least N-(N-1)/2 JournalNodes to ensure the security of namespace modification. In other words, if n JournalNode processes are started under the HA policy, the entire JournalNode cluster allows at most (N-1)/2 processes to die, so as to ensure that the editlog is successfully and completely written. For example, when there are three JournalNodes in the cluster, at most one JournalNode is allowed to hang up; When there are five JournalNodes in the cluster, a maximum of two JournalNodes are allowed to hang up. Standby Namenode is responsible for observing the changes of editlog files. It can read editlog files from JournalNodes and then merge and update them into its own namespace. Once the ActiveNamenode fails, the Standby Namenode will ensure that all editlog files are read from the JournalNodes, and then switch to the Active state.

Standby Namenode reads all editlog files to ensure that the namespace state is fully synchronized with Active Namenode before failover.

The Standby Namenode always maintains a namespace of the latest version. It will constantly merge the read editlog file with the current namespace

StandbyNamenode only needs to judge whether the two conditions for triggering checkpoint operation are met. If the trigger conditions are met, the namespace of StandbyNamenode will be written into a new fsimage file, and the fsimage file will be returned to Active Namenode through HTTP.

 

■ Standby Namenode checks whether the two conditions that trigger the checkpoint operation are met.
■ Standby Namenode saves the current namespace to fsimage ckpt_ Txid file, where txid is the transaction id recorded in the latest editlog file. Then the Standby Namenode writes the MD5 checksum of the fsimage file, and finally renames the fsimage ckpt_ Txid file is fsimage_txid. When this operation is performed, other Standby Namenode operations will be blocked. For example, when an Active Namenode error occurs, it is necessary to switch between active and standby operations or access the Web interface of Standby Namenode. Note that the operation of Active Namenode will not be affected, such as listing, reading, writing files, etc.
■ Standby Namenode sends HTTP GET request / getimage to ImageServlet of Active Namenode? putimage=1. The URL of this request contains the transaction id of the new fsimage file, as well as the port and IP address used by Standby Namenode for download.
■ Active Namenode will send an HTTP GET request to the ImageServlet of Standby Namenode to download the fsimage file according to the information provided by Standby Namenode. Namenode first names the downloaded file fsimage ckpt_*, Then create MD5 checksum, and finally fsimage ckpt_* Rename to fsimage_ *.

 

III  FSNamesystem#startStandbyServices

When Namenode starts, FSNamesystem will be loaded. In FSNamesystem, a standbycheckpoint class will be started through startStandbyServices Used to handle checkpoint operations  

 /**
   * Start services required in standby or observer state
   * 
   * @throws IOException
   */
  void startStandbyServices(final Configuration conf, boolean isObserver)
      throws IOException {
    LOG.info("Starting services required for " +
        (isObserver ? "observer" : "standby") + " state");
    if (!getFSImage().editLog.isOpenForRead()) {
      // During startup, we're already open for read.
      getFSImage().editLog.initSharedJournalsForRead();
    }
    blockManager.setPostponeBlocksFromFuture(true);

    // Disable quota checks while in standby.
    dir.disableQuotaChecks();
    editLogTailer = new EditLogTailer(this, conf);
    editLogTailer.start();
    if (!isObserver && standbyShouldCheckpoint) {
      standbyCheckpointer = new StandbyCheckpointer(conf, this);
      standbyCheckpointer.start();
    }
  }

 

IV CheckpointerThread

When standbycheckpoint calls the start method, it will start the checkpoint thread thread, and when it executes the run method, it will call the doWork method

private void doWork() {

      //The interval is 1 hour
      final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();
      System.out.println("StandbyCheckpointer#doWork=>checkPeriod : "+ checkPeriod);
      // Reset checkpoint time so that we don't always checkpoint
      // on startup.
      lastCheckpointTime = monotonicNow();
      lastUploadTime = monotonicNow();
      while (shouldRun) {

        boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();

        if (!needRollbackCheckpoint) {
          try {
            Thread.sleep(checkPeriod);
          } catch (InterruptedException ie) {
          }
          if (!shouldRun) {
            break;
          }
        }
        try {
          // We may have lost our ticket since last checkpoint, log in again, just in case
          if (UserGroupInformation.isSecurityEnabled()) {
            UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
          }


          final long now = monotonicNow();
          //Get the difference between the txid last written to the JournalNode and the txid last checked
          final long uncheckpointed = countUncheckpointedTxns();

          //Calculate the interval between the current time and the last checkpoint operation time
          final long secsSinceLast = (now - lastCheckpointTime) / 1000;

          // if we need a rollback checkpoint, always attempt to checkpoint
          boolean needCheckpoint = needRollbackCheckpoint;

          if (needCheckpoint) {
            LOG.info("Triggering a rollback fsimage for rolling upgrade.");
          } else if (uncheckpointed >= checkpointConf.getTxnCount()) {

            ///The first case is consistent with consolidation:
            // The difference between the txid last written to the JournalNode and the txid last checked
            // Greater than or equal to DFS namenode. checkpoint. The number of txns configurations (1 million by default) is merged once

            LOG.info("Triggering checkpoint because there have been {} txns " +
                "since the last checkpoint, " +
                "which exceeds the configured threshold {}",
                uncheckpointed, checkpointConf.getTxnCount());
            needCheckpoint = true;
          } else if (secsSinceLast >= checkpointConf.getPeriod()) {
            LOG.info("Triggering checkpoint because it has been {} seconds " +
                "since the last checkpoint, which exceeds the configured " +
                "interval {}", secsSinceLast, checkpointConf.getPeriod());

            //The second case is consistent with consolidation:
            // When the time interval is greater than or equal to DFS namenode. checkpoint. Period [one hour]
            // Merge when configuring time
            needCheckpoint = true;
          }

          //If the checkpoint execution conditions are met, the doCheckpoint() method is called to perform the checkpoint operation
          if (needCheckpoint) {
            synchronized (cancelLock) {
              if (now < preventCheckpointsUntil) {
                LOG.info("But skipping this checkpoint since we are about to failover!");
                canceledCount++;
                continue;
              }
              assert canceler == null;
              canceler = new Canceler();
            }

            // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
            // rollback request, are the checkpointer, are outside the quiet period.
            final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
            boolean sendRequest = isPrimaryCheckPointer || secsSinceLastUpload >= checkpointConf.getQuietPeriod();


            doCheckpoint(sendRequest);

            // reset needRollbackCheckpoint to false only when we finish a ckpt
            // for rollback image
            if (needRollbackCheckpoint
                && namesystem.getFSImage().hasRollbackFSImage()) {
              namesystem.setCreatedRollbackImages(true);
              namesystem.setNeedRollbackFsImage(false);
            }
            lastCheckpointTime = now;
            LOG.info("Checkpoint finished successfully.");
          }
        } catch (SaveNamespaceCancelledException ce) {
          LOG.info("Checkpoint was cancelled: {}", ce.getMessage());
          canceledCount++;
        } catch (InterruptedException ie) {
          LOG.info("Interrupted during checkpointing", ie);
          // Probably requested shutdown.
          continue;
        } catch (Throwable t) {
          LOG.error("Exception in doCheckpoint", t);
        } finally {
          synchronized (cancelLock) {
            canceler = null;
          }
        }
      }
    }

Vi  doCheckpoint(sendRequest);

The entire logic of checkpoint execution is implemented in the docheckpoint () method. The doCheckpoint() method first obtains the prevcheckpoint txid of the currently saved fsimage, and then obtains the thischeckpoint txid of the recently updated editlog. Only the new thischeckpoint txid is greater than prevcheckpoint txid
It is only when the current namespace is updated but not saved to the new fsimage file that a checkpoint operation is necessary. After the judgment is completed, doCheckpoint() will call the saveNamespace() method to save the latest namespace to the fsimage file. Then construct a thread to upload the newly generated fsimage file to the server via HTTP
In AvtiveNamenode

/**
   * The entire logic of checkpoint execution is implemented in the doCheckpoint() method.
   *
   * doCheckpoint()Method first obtains the prevcheckbpointtxid of the currently saved fsimage,
   * Then get the thischeckpoint txid of the recently updated editlog,
   * Only the new thischeckpoint txid is greater than prevcheckpoint txid,
   * When a new fsimage file is saved to the current namespace, it is not updated,
   * It is necessary to conduct a checkpoint operation.
   *
   * After judgment,
   * doCheckpoint()The saveNamespace() method is called to save the latest namespace to the fsimage file.
   *
   * Then construct a thread to upload the newly generated fsimage file to AvtiveNamenode through HTTP.
   *
   * @param sendCheckpoint
   * @throws InterruptedException
   * @throws IOException
   */
  private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
    assert canceler != null;
    final long txid;
    final NameNodeFile imageType;
    // Acquire cpLock to make sure no one is modifying the name system.
    // It does not need the full namesystem write lock, since the only thing
    // that modifies namesystem on standby node is edit log replaying.
    namesystem.cpLockInterruptibly();
    try {
      assert namesystem.getEditLog().isOpenForRead() :
        "Standby Checkpointer should only attempt a checkpoint when " +
        "NN is in standby mode, but the edit logs are in an unexpected state";

      //Gets the latest fsimage object saved on the current Standby Namenode
      FSImage img = namesystem.getFSImage();

      //Get the txid saved in fsimage, that is, the txid saved in the last checkpoint operation
      long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();

      //Get the latest txid of the current namespace, that is, the latest txid of the received editlog
      long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();
      assert thisCheckpointTxId >= prevCheckpointTxId;

      //If they are equal, there is no need to perform checkpoint operation. The current fsimage is already up-to-date
      if (thisCheckpointTxId == prevCheckpointTxId) {
        LOG.info("A checkpoint was triggered but the Standby Node has not " +
            "received any transactions since the last checkpoint at txid {}. " +
            "Skipping...", thisCheckpointTxId);
        return;
      }

      if (namesystem.isRollingUpgrade()
          && !namesystem.getFSImage().hasRollbackFSImage()) {

        //If the Namenode is currently performing an upgrade operation, create a fsimage_rollback file
        // if we will do rolling upgrade but have not created the rollback image
        // yet, name this checkpoint as fsimage_rollback
        imageType = NameNodeFile.IMAGE_ROLLBACK;
      } else {

        //Create fsimage file under normal conditions
        imageType = NameNodeFile.IMAGE;
      }

      //Call saveNamespace() to save the current namespace to a new file
      img.saveNamespace(namesystem, imageType, canceler);


      txid = img.getStorage().getMostRecentCheckpointTxId();
      assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
          thisCheckpointTxId + " but instead saved at txid=" + txid;

      // Save the legacy OIV image, if the output dir is defined.
      String outputDir = checkpointConf.getLegacyOivImageDir();
      if (outputDir != null && !outputDir.isEmpty()) {
        try {
          img.saveLegacyOIVImage(namesystem, outputDir, canceler);
        } catch (IOException ioe) {
          LOG.warn("Exception encountered while saving legacy OIV image; "
                  + "continuing with other checkpointing steps", ioe);
        }
      }
    } finally {
      namesystem.cpUnlock();
    }

    //early exit if we shouldn't actually send the checkpoint to the ANN
    if(!sendCheckpoint){
      return;
    }


    //Construct a thread to upload fsimage to Active Namenode via HTTP
    // Upload the saved checkpoint back to the active
    // Do this in a separate thread to avoid blocking transition to active, but don't allow more
    // than the expected number of tasks to run or queue up
    // See HDFS-4816
    ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
        uploadThreadFactory);
    // for right now, just match the upload to the nn address by convention. There is no need to
    // directly tie them together by adding a pair class.
    List<Future<TransferFsImage.TransferResult>> uploads =
        new ArrayList<Future<TransferFsImage.TransferResult>>();
    for (final URL activeNNAddress : activeNNAddresses) {
      Future<TransferFsImage.TransferResult> upload =
          executor.submit(new Callable<TransferFsImage.TransferResult>() {
            @Override
            public TransferFsImage.TransferResult call()
                throws IOException, InterruptedException {


              CheckpointFaultInjector.getInstance().duringUploadInProgess();
              return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
                  .getFSImage().getStorage(), imageType, txid, canceler);

            }
          });
      uploads.add(upload);
    }
    InterruptedException ie = null;
    IOException ioe= null;
    int i = 0;
    boolean success = false;
    for (; i < uploads.size(); i++) {
      Future<TransferFsImage.TransferResult> upload = uploads.get(i);
      try {
        // TODO should there be some smarts here about retries nodes that are not the active NN?
        if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
          success = true;
          //avoid getting the rest of the results - we don't care since we had a successful upload
          break;
        }

      } catch (ExecutionException e) {
        ioe = new IOException("Exception during image upload", e);
        break;
      } catch (InterruptedException e) {
        ie = e;
        break;
      }
    }
    if (ie == null && ioe == null) {
      //Update only when response from remote about success or
      lastUploadTime = monotonicNow();
      // we are primary if we successfully updated the ANN
      this.isPrimaryCheckPointer = success;
    }
    // cleaner than copying code for multiple catch statements and better than catching all
    // exceptions, so we just handle the ones we expect.
    if (ie != null || ioe != null) {

      // cancel the rest of the tasks, and close the pool
      for (; i < uploads.size(); i++) {
        Future<TransferFsImage.TransferResult> upload = uploads.get(i);
        // The background thread may be blocked waiting in the throttler, so
        // interrupt it.
        upload.cancel(true);
      }

      // shutdown so we interrupt anything running and don't start anything new
      executor.shutdownNow();
      // this is a good bit longer than the thread timeout, just to make sure all the threads
      // that are not doing any work also stop
      executor.awaitTermination(500, TimeUnit.MILLISECONDS);

      // re-throw the exception we got, since one of these two must be non-null
      if (ie != null) {
        throw ie;
      } else if (ioe != null) {
        throw ioe;
      }
    }
  }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Posted by slshmily on Sat, 14 May 2022 06:11:33 +0300