Probe into the source code of Node cluster

introduction

When we talk about nodejs , because JavaScript , can only run on a single thread, a Node process can only run on one CPU, which can not give full play to the multi-core characteristics of modern CPUs. This restricts the development of a server language. Fortunately, Node is in v0 After Cluster 10.0, multiple processes can be built and used 12 rewrites the module to greatly improve its performance. Next, we will walk into Node Cluster to see how Node implements Cluster.

 

Multi process single thread and single process multi thread model

First of all, this involves two models of programming. The Nodejs mentioned in this paper is implemented by "multi process single thread", while Java and other languages use the "single process multi thread" model. Therefore, this paper focuses on the "multi process single thread" model. As for the "single process multi thread" model, it will be slightly involved in the comparison.

 

Multi process single thread

As mentioned above, JavaScript} can only run in a single thread, so Node can only run in one thread of a process at the JS level. To implement clustering, you must create multiple processes to enable multiple application instances to run at the same time.

Cluster module provides master worker mode to start multiple application modes.

Next, let's go into this module and see what has been done inside it.

 

Cluster

What is a Cluster?

  1. Start multiple processes on the server
  2. Each process runs the same code
  3. Each process can even listen to the same port (the implementation principle is analyzed below)

Of which:

  1. The process responsible for starting other processes is called Master process. It is like a "Contractor". It does not do specific work, but is only responsible for starting other processes.
  2. Other processes that are started are called Worker processes. As the name suggests, they are "workers" who work. They receive requests and provide services.
  3. The number of Worker processes is generally determined by the number of CPU cores of the server, so that multi-core resources can be used perfectly.
const cluster = require('cluster');  
const http = require('http');  
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {  
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}

 

Creation of worker process

Use the fork method of the Cluster module to create sub processes

cluster.fork();

Starting from the initialization of the worker process, when the master process fork s its worker process, it will attach a UNIQUE ID (NODE UNIQUE ID) to its environment variable (workerEnv), which is an incremental number starting from 0.

var ids = 0;

//.....

cluster.fork = function(env) {  
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });

// ....

function createWorkerProcess(id, env) {  
  const workerEnv = util._extend({}, process.env);
  const execArgv = cluster.settings.execArgv.slice();
  const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;

  util._extend(workerEnv, env);
  workerEnv.NODE_UNIQUE_ID = '' + id;

  // .....

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

Then, when the instance is initialized, the Node uses the ID to judge whether to use the clild js Or master. js

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';  
module.exports = require(`internal/cluster/${childOrMaster}`);

Let's focus on this {fork() function again,

exports.fork = function(modulePath /*, args, options*/) {

  // Get options and args arguments.
  var execArgv;
  var options = {};
  var args = [];
  var pos = 1;
  if (pos < arguments.length && Array.isArray(arguments[pos])) {
    args = arguments[pos++];
  }

  if (pos < arguments.length && arguments[pos] != null) {
    if (typeof arguments[pos] !== 'object') {
      throw new TypeError('Incorrect value of args option');
    }

    options = util._extend({}, arguments[pos++]);
  }

  // Prepare arguments for fork:
  execArgv = options.execArgv || process.execArgv;

  if (execArgv === process.execArgv && process._eval != null) {
    const index = execArgv.lastIndexOf(process._eval);
    if (index > 0) {
      // Remove the -e switch to avoid fork bombing ourselves.
      execArgv = execArgv.slice();
      execArgv.splice(index - 1, 2);
    }
  }

  args = execArgv.concat([modulePath], args);

  if (typeof options.stdio === 'string') {
    options.stdio = stdioStringToArray(options.stdio);
  } else if (!Array.isArray(options.stdio)) {
    // Use a separate fd=3 for the IPC channel. Inherit stdin, stdout,
    // and stderr from the parent if silent isn't set.
    options.stdio = options.silent ? stdioStringToArray('pipe') :
      stdioStringToArray('inherit');
  } else if (options.stdio.indexOf('ipc') === -1) {
    throw new TypeError('Forked processes must have an IPC channel');
  }

  options.execPath = options.execPath || process.execPath;
  options.shell = false;

  return spawn(options.execPath, args, options);
};

Some parameter preparations are made in the function, and the focus is on this} options Processing of stdio #.

options.stdio is used to configure the pipeline established between the child process and the parent process. Its value should be an array, but for convenience, the value can be one of the following strings:

'pipe' - equivalent to ['pipe', 'pipe', 'pipe'] (default)

'ignore' - equivalent to ['ignore ',' ignore ',' ignore ']

'inherit' - equivalent to [process.stdin, process.stdout, process.stderr] or [0,1,2]

Each value corresponds to the way [process.stdin, process.stdout, process.stderr] standard input, standard output and standard error are output to the parent process.

The child process derived by Fork must be added with an IPC channel to pass messages or file descriptors between parents and children. Therefore, the function code of "stdiostrintoarray" is as follows

function stdioStringToArray(option) {  
  switch (option) {
    case 'ignore':
    case 'pipe':
    case 'inherit':
      return [option, option, option, 'ipc'];
    default:
      throw new TypeError('Incorrect value of stdio option: ' + option);
  }
}

 

Interprocess communication between multiple processes

Let's focus on how to implement this # IPC channel # and its working principle

Inter process communication (IPC) is actually a very simple concept. As long as you transfer the data of this process to another process, it is "IPC". There are many ways to realize this data transfer, such as the following

In Node, there are two implementations of # IPC # through named pipes on Windows and # UNIX domain sockets (UDS) on UNIX. See # official documents for details

At present, Linux is still the mainstream server operating system. This paper mainly analyzes the use of UDS under Linux

UDS is developed on the basis of Socket. Generally, Socket refers to IP Socket, which communicates through network protocol. However, can communication on the same device bypass the restrictions of network layer. Here we can implement it through UDS, so we call it LocalSocket, which seems more appropriate.

In Linux, everything can be treated as a file. UDS is no exception.

Child in Node_ process. JS module, there is the following code

stdio = stdio.reduce(function(acc, stdio, i) {  
  // ......
else if (stdio === 'ipc') {  
      if (sync || ipc !== undefined) {
        // Cleanup previously created pipes
        cleanup();
        if (!sync)
          throw new errors.Error('ERR_IPC_ONE_PIPE');
        else
          throw new errors.Error('ERR_IPC_SYNC_FORK');
      }

      ipc = new Pipe(PipeConstants.IPC);
      ipcFd = i;

      acc.push({
        type: 'pipe',
        handle: ipc,
        ipc: true
      });
    }

When the type of stdio is , ipc , an ipc pipeline will be created, and its fd is the index of 'ipc' in the stdio array

ipc = new Pipe(PipeConstants.IPC);  
ipcFd = i;

At this time, you should be attracted by this # Pipe # and what is it? It's not much to say. Go directly to the implementation of the corresponding Pipe in # libuv #.

void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {  
  // This constructor should not be exposed to public javascript.
  // Therefore we assert that we are not trying to call this as a
  // normal function.
  CHECK(args.IsConstructCall());
  CHECK(args[0]->IsInt32());
  Environment* env = Environment::GetCurrent(args);

  int type_value = args[0].As<Int32>()->Value();
  PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value);

  bool ipc;
  ProviderType provider;
  switch (type) {
    case SOCKET:
      provider = PROVIDER_PIPEWRAP;
      ipc = false;
      break;
    case SERVER:
      provider = PROVIDER_PIPESERVERWRAP;
      ipc = false;
      break;
    case IPC:
      provider = PROVIDER_PIPEWRAP;
      ipc = true;
      break;
    default:
      UNREACHABLE();
  }

  new PipeWrap(env, args.This(), provider, ipc);
}


PipeWrap::PipeWrap(Environment* env,  
                   Local<Object> object,
                   ProviderType provider,
                   bool ipc)
    : ConnectionWrap(env, object, provider) {
  int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
  CHECK_EQ(r, 0);  // How do we proxy this error up to javascript?
                   // Suggestion: uv_pipe_init() returns void.
  UpdateWriteQueueSize();
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {  
  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
  handle->shutdown_req = NULL;
  handle->connect_req = NULL;
  handle->pipe_fname = NULL;
  handle->ipc = ipc;
  return 0;
}
void uv__stream_init(uv_loop_t* loop,  
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;

  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;

  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        /* In the rare case that "/dev/null" isn't mounted open "/"
         * instead.
         */
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }

#if defined(__APPLE__)
  stream->select = NULL;
#endif /* defined(__APPLE_) */

  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

You can see the corresponding ipc pipeline. The bottom layer is realized through file flow. Then, the pipeline has the same mode as the stream, that is, open write/read close

Before actually creating a child process, the parent process will create an IPC CHANNEL and listen to it, and then really create a child process, and tell the child process the file descriptor (FD) of this IPC communication through the environment variable (NODE # CHANNEL # FD). In the process of starting, the child process connects the existing IPC CHANNEL according to the file descriptor, so as to complete the connection between the parent and child processes.

When the parent process send s data to the child process, it starts to write data to this special file through this fd. At this time, the write method of the underlying stream is called, and the child process has been connected to the channel during startup, and receives data through the message event (the bottom should be the read method of stream) in the application layer.

Since the IPC channel is duplex communication, the child process can also communicate with the parent process.

Resource search website Encyclopedia https://www.renrenfan.com.cn Guangzhou VI design companyhttps://www.houdianzi.com

Problem analysis

How do multiple processes share ports?

So how does node realize that multiple processes listen to the same port? The key here is to pass the} handle.

The following code:

//Main process code

var child = require('child_process').fork('child.js');  
// Open up the server object and send the handle
var server = require('net').createServer();  
server.on('connection', function (socket) {  
    socket.end('handled by parent\n');
});
server.listen(1337, function () {  
    child.send('server', server);
});

//Subprocess code
process.on('message', function (m, server) {  
    if (m === 'server') {
        server.on('connection', function (socket) {
            socket.end('handled by child\n');
        });
    }
});

In the example, the parent process directly passes the created Tcp object to the child process, but we know that memory cannot be shared directly between the two processes, so how is this realized?

Look at the send method

child.send(message, [sendHandle])

At present, the handle types that the sub process object send() method can send include the following:

1.net.socket, tcp socket

2.net.Server, tcp server, any application layer service based on tcp service can enjoy the benefits it brings.

3.net.Native, a tcp socket or IPC pipeline at the c + + level.

4.dgram.socket, UDP socket

5.dgram.Native, UDP socket at C + + level

Before sending the message to the IPC pipeline, the send() method assembles the message into two objects. One parameter is handle and the other is message. In fact, the file descriptor of the handle is sent to the child process, and the message object will also be serialized into a string.

The child process can read the message sent by the parent process because it has previously connected to the IPC channel. After getting this message, use JSON Parse is restored to an object, and the CMD value is analyzed. If message cmd = NODE_ Handle means that the parent process passed a handle. At this time, the Tcp object will be restored through the obtained fd and the passed handle type.

After restoring the server object, you can listen to the port.

setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))

In libuv, so is set during setsockopt_ Reuseaddr option (the meaning of this option can be Baidu). In short, after this option is set, at the Linux level, we can use different processes to listen to the same network card and port. For processes started independently, we don't know each other's fd. Therefore, when a process listens successfully, the subsequent process will fail, but the server object restored through send, Their fd is the same. At this time, the child process can listen to this port through this fd.

However, fd can only be occupied by one process at a time. In other words, when a network request is sent to the server, only one lucky process can grab the connection, that is, only he can serve the request. These processes are also preemptive.

 

summary

With the gradual improvement of these modules, the use scenarios of Nodejs on the server are becoming more and more abundant. If you notice it only because of the suffix JS, I hope you can pause and have a good understanding of this young language. I believe it will bring you surprise.

 

Posted by fatalcure on Mon, 02 May 2022 18:00:05 +0300