MXNet source code analysis | KVStore

I planned to write an article on the communication process in MXNet distributed training very early. After making a draft in April, I didn't continue to sort it out due to various trivia. It happened that several new students came to the laboratory in recent days and needed some material assistance; Catch up with the autumn recruitment season, and always ask some framework related questions during the interview. So I rearranged this article, which is convenient for others and myself.

This paper is mainly based on MXNet1.6.0 Analyze the version and try to examine the distributed communication process of MXNet from a strategic height. Reading this article requires a certain understanding of the parameter server architecture and synchronous / asynchronous training. Students who are not familiar with this article can cross out this article ­čśŤ.

Python end

Now that we know that kvstore is responsible for parameter synchronization in distributed training, how is it applied in training? Next we will start from gluon Starting with the trainer interface, the gradient exchange and parameter synchronization process of distributed training are analyzed step by step. The following code is taken from Python / mxnet / gluno / trainer Py file removes some redundant information (such as some judgments, comments, etc.) compared with the source code, so that we can better focus on the communication process.

The step function in the code is the main body for gradient exchange and parameter update. It is called first_ init_kvstore to initialize kvstore, and then call_ allreduce_grads performs gradient transmission, and finally calls_ Update implements parameter update.

class Trainer(object):
    def step(self, batch_size, ignore_stale_grad=False):
        if not self._kv_initialized:
        if self._params_to_init:


First_ init_ The kvstore function will call model through the parameters specified by the user PY_ create_kvstore to initialize kvstore and update_kv_store these two variables. Kvstore is an instantiated object of kvstore class, and update_on_kvstore is a boolean variable used to judge whether to update parameters on ps side. In other words, if the variable is True, the update of model parameters occurs at the ps end; Otherwise, the update of model parameters occurs on the worker side, and the ps side only performs gradient aggregation (in this case, does the parameter server become a gradient server? ­čśŤ). However, only in synchronous training mode can we set update_on_kvstore=False, asynchronous training does not support updating parameters on the worker side. In update_ kv_ When store = True, we need to tell the optimizer used in ps side training, so we need to call kvstore set_ Optimizer transfers the optimizer from the worker side to the ps side.

from ..model import _create_kvstore
class Trainer(object):
    def _init_kvstore(self):
        """Create kvstore."""
        config = self._kvstore_params
        arg_arrays = {[0]) for param in self._params}
        kvstore, update_on_kvstore = _create_kvstore(config['kvstore'], len(self._contexts),
        self._distributed = 'dist' in kvstore.type if kvstore else False
        if self._distributed and 'async' in kvstore.type:
            update_on_kvstore = True
            # raise err if user provides unsupported configs
            if config['update_on_kvstore'] is False:
                raise ValueError("Please set update_on_kvstore=True "
                                 "when training in async mode.")
        if config['update_on_kvstore'] is not None:
            update_on_kvstore = config['update_on_kvstore'

        if kvstore:
            if update_on_kvstore:
                # optimizer preferably needs to be set before init for multiprecision
            self._kvstore = kvstore
            self._update_on_kvstore = update_on_kvstore
            self._kvstore = None
            self._update_on_kvstore = None
        self._kv_initialized = True

After completing the initialization of kvstore, gluon Trainer will call_ allreduce_grads to achieve gradient exchange. Hey, didn't we say that MXNet is a parameter server architecture? Why is it related to Allreduce? Consider update_ on_ In the case of kvstore = false, each worker has only its own local gradient at first. After pushing the gradient to ps and aggregating, each worker will pull back the same gradient after aggregation from ps. Are the push and pull operations in the whole process very similar to Reduce and Broadcast (the gradient "Reduce" on the worker to the ps, and then the "Broadcast" on the ps end aggregates the results to each worker)? Observe_ Allreduce_ The implementation of grads can be found, regardless of update_ on_ What is the value of kvstore, gluno Trainer will push the gradient from the worker side to the ps side, but when update_ on_ When kvstore = true, gluon Trainer pushes the gradient from the worker to ps, and it's done; And when updata_ on_ When kvstore = false, gluon Trainer will also pull back the aggregation result of the gradient from the ps side for local parameter update.

class Trainer(object):
    def _allreduce_grads(self):
        if self._kvstore:
            for i, param in enumerate(self._params):
                if param.grad_req != 'null':
                    self._kvstore.push(i, param.list_grad(), priority=-i)
                    if not self._update_on_kvstore:
                        self._kvstore.pull(i, param.list_grad(), priority=-i,

gluon.Trainer._ The update function will update_ on_ The value of kvstore is used to pull and update the corresponding parameters. Under the single machine training (kvstore is None) or the local update mode of distributed training (update_on_kvstore=True), gluon Trainer will use the optimizer set by the user to update the parameters locally for the next training. In the case of distributed training, when we set update_ on_ When kvstore = true, the model parameters will be updated at the ps side, so in this function, you only need to pull the model parameters from the ps side to the local.

class Trainer(object):
    def _update(self, ignore_stale_grad=False):
        updates = [[] for _ in self._updaters]

        for i, param in enumerate(self._params):
            if self._kvstore and self._update_on_kvstore:
                if param._stype == 'default':
                    # 'row_sparse' parameters are not pulled immediately - they're pulled
                    # in `Block.forward`
                    self._kvstore.pull(i, param.list_data(), priority=-i)

            for upd, arr, grad in zip(updates, param.list_data(), param.list_grad()):
                if not ignore_stale_grad or arr._fresh_grad:
                    upd.append((i, grad, arr))
                    arr._fresh_grad = False

        if not (self._kvstore and self._update_on_kvstore):
            for updater, upd in zip(self._updaters, updates):
                if upd:
                    i, w, g = zip(*upd)
                    updater(i, w, g)

Here, we have basically finished the kvstore calling process on the python side.

C + + end

KVStore* KVStore::Create(const char *type_name) {
  std::string tname = type_name;
  std::transform(tname.begin(), tname.end(), tname.begin(), ::tolower);
  KVStore* kv = nullptr;
  bool use_device_comm = false;
  auto has = [tname](const std::string& pattern) {
    return tname.find(pattern) != std::string::npos;
  if (has("device")) {
    use_device_comm = true;

  if (has("dist")) {
    kv = new kvstore::KVStoreDist(use_device_comm);
    if (!has("_async") && kv->IsWorkerNode() && kv->get_rank() == 0) {
      // configure the server to be the sync mode
      kv->SendCommandToServers(static_cast<int>(kvstore::CommandType::kSyncMode), "");
  } else {
    if (has("nccl")) {
      kv = new kvstore::KVStoreNCCL();
    } else {
      kv =  new kvstore::KVStoreLocal(use_device_comm);
  kv->type_ = tname;
  return kv;

There are several important classes under the KVStore module.

KVStore is an abstract class that provides some general APIs, such as Push, Pull, pullrowspark, etc. Because kvotre supports two types of keys: int and string, these APIs provide two overloads with different types of keys as parameters.

KVStoreLocal inherits from KVStore and is responsible for single machine multi card communication. It mainly maintains the following variables: comm_, which is responsible for the communication between different devices, Fixed memory on the machine (no page exchange, always in physical memory), local key Val buffer, mapping from string key to integer key and mapping from integer key to string key. In its constructor, it will create different communication objects according to the incoming device type. The communication objects that can be created include CommCPU using CPU communication, CommDevice using GPU communication, and CommDeviceTree using tree aggregation in GPU.

Kvstorelist inherits from KVStoreLocal and is responsible for multi machine communication.

classDiagram Comm <|.. CommCPU Comm <|.. CommDevice CommDevice <|-- CommDeviceTree Comm <.. KVStoreLocal KVStore <|.. KVStoreLocal KVStoreLocal <|-- KVStoreDist KVStoreLocal <|-- KVStoreNCCL KVStoreDist ..> KVStoreDistServer class Comm { <<abstract>> +Init() void +Reduce() const NDArray& +Broadcast() void +BroadcastRowSparse() void #Context pinned_ctx_ } class CommCPU { -ReduceSumCPU() void -ReduceSumCPUExSerial() void -unordered_map<int, BufferEntry> merge_buf_ -size_t bigarray_bound_ } class CommDevice { +InitBuffersAndComm() void +ReduceCompressed() const NDArray& +InitMergeBuffer() void -EnableP2P() void -unordered_map<int, BufferEntry> merge_buf_ } class CommDeviceTree { +ReduceInner() const NDArray& +BroadcastInner() void -int depth_ -int gpuarray_bound_ } class KVStore { <<abstract>> +Create() static KVStore* +Barrier() void +RunServer() void } class KVStoreLocal { +Init() void -InitImpl() void -PushImpl() void -PullImpl() void -PushPullImpl() void #GroupKVPairsPush() void #GroupKVPairsPull() void #GroupKVPairs() void #LookupKeys() void #Comm* comm_ #unordered_map<int, NDArray> local_ #unordered_map<string, int> str_key_dict_ #unordered_map<int, string> reverse_str_key_dict_ } class KVStoreDist { -EncodeDefaultKey() void -EncodeCompressedKey() void -EncodeRowSparseKey() void -unordered_map<int, PSKV> ps_kv_ -KVWorker<char>* ps_worker_ -KVStoreDistServer* server_ -size_t bigarray_bound_ -unordered_map<int, NDArray> comm_buf_ -unordered_map<int, NDArray> compr_buf_ -unordered_map<int, NDArray> residual_ } class KVStoreDistServer { -ApplyUpdates() void -DefaultStorageResponse() void -DataHandleDefault() void -unordered_map<int, NDArray> store_ -unordered_map<int, NDArray> update_buf_ -unordered_map<int, NDArray> decom_buf_ -KVServer<char>* ps_server_ }

KVStore creation

MXNet supports six kvstore modes: local, device, dist_sync´╝îdist_async´╝îdist_sync_device´╝îdist_async_device, these modes are specified through the command line during training. First judge whether there is dist. if so, create kvstorelist. If there is dist and async, send asynchronous training message to the server. If the command contains device, it will be aggregated on the GPU.

In process communication

For the data exchange requirements between devices within each process in stand-alone training and distributed training, MXNet implements the in-process communication mechanism based on Reduce and Broadcast semantics.

Create an object responsible for in-process communication.

explicit KVStoreLocal::KVStoreLocal(bool use_device_comm) : KVStore() {
  if (use_device_comm) {
    bool tree = dmlc::GetEnv("MXNET_KVSTORE_USETREE", 0)
    if (tree) {
      comm_ = new CommDeviceTree();
    } else {
      comm_ = new CommDevice();
  } else {
    comm_ = new CommCPU();

In process Push implementation

virtual void KVStoreLocal::PushImpl(const std::vector<int>& keys,
                                    const std::vector<NDArray>& values,
                                    int priority) {
  std::vector<int> uniq_keys;
  std::vector<std::vector<NDArray>> grouped_val;
  GroupKVPairsPush(keys, values, &uniq_keys, &grouped_val, false);
  for (size_t i = 0; i <uniq_keys.size(); ++i) {
    int key = uniq_keys[i];
    const NDArray& merged = comm_->Reduce(key, grouped_val[i], priority);
    NDArray& local = local_[key];
    if (updater_ != nullptr) {
      if (kye_type_ == kStringKye && str_updater_ != nullptr) {
        str_updater_(str_key, merged, &local);
      } else {
        updater_(key, merged, &local);
    } else {
      if (merged.storage_type() != local.storage_type()) {
        local = merged.Copy(local.ctx());
      } else {
        local = merged;

In process Pull implementation

virtual void KVStoreLocal::PullImpl(const std::vector<int>& keys,
                                    const std::vector<NDArray*>& values,
                                    int priority,
                                    bool ignore_sparse) {
  std::vector<int> uniq_keys;
  std::vector<std::vector<NDArray*>> grouped_keys;
  GroupKVPairsPull(keys, values, &unique_keys, &grouped_vals, ignore_sparse);
  for (size_t i = 0; i < uniq_keys.size(); ++i) {
    int key = uniq_keys[i];
    const NDArray& local = local_[key];
    comm_->Broadcast(key, local, grouped_vals[i], priority);

Interprocess communication

Interprocess Push

void KVStoreDist::Push_(const std::vector<int>& keys,
                        const std::vector<NDArray>& values,
                        int priority,
                        bool do_merge) {
  std::vector<int> uniq_keys;
  std::vector<std::vector<NDArray>> grouped_val;
  GroupKVPairsPush(keys, values, &uniq_keys, &grouped_val, false);

  for (size_t i = 0; i < uniq_keys.size(); ++i) {
    int key = uniq_keys[i];
    const auto& vals = grouped_vals[i];
    NDArray merged = do_merge ? comm_->Reduce(key, vals, priority) : vals[0];
    PSKV& pskv = EncodeDefaultKey(key, comm_buf.shape().Size(), num_bytes);
    PushDefault(key, comm_buf, pskv, priority);

void KVStoreDist::PushDefault(int key,
                              const NDArray& send_buf,
                              const PSKV& pskv,
                              int priority) {
  auto push_to_servers = [](){
    ps_worker_->ZPush(pskv.keys, vals, pskv.lens, cmd, [cb](){ cb(); });

Interprocess Pull implementation

void KVStoreDist::PullImpl(const std::vector<int>& keys,
                           const std::vector<NDArray*>& values,
                           int priority,
                           bool ignore_sparse) override {
  std::vector<int> uniq_keys;
  std::vector<std::vector<NDArray>> grouped_val;
  GroupKVPairsPush(keys, values, &uniq_keys, &grouped_val, false);

  for (size_t i = 0; i < uniq_keys.size(); ++i) {
    auto pull_from_server = [](){
      auto val = new ps::SArray<char>(data, size * num_bytes, false);
      ps_worker_->ZPull(pskv.keys, vals, &pskv.lens, cmd, [vals, cb](){ delete vals; cb(); });

The ps end processes the received data

void DataHandleDefault(const DataHandleType type, const ps::KVMeta& req_meta,
                       const ps::KVPairs<char>& req_data, ps::KVServer<char>* server) {
  if (req_meta.push) {
    auto& stored = store_[key];
    if (stored.is_none()) {
      stored = NDArray(...);
    } else {
      auto& updates = update_buf_[key];
      if (updates.request.empty()) {
        if (sync_mode_) {
          CopyFromTo(recvd, updates.merged);
      } else {
        updates.merged += recved;
  } else {

Tags: Mxnet

Posted by wellmoon on Sun, 22 May 2022 07:47:00 +0300