调试

Node.js cluster 中监听同一个端口为什么不会报错?

by Cheng, 2022-09-26


Node.js 官方文档上边的例子:

const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').cpus().length;
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

众所周知,当一个端口被监听的时候如果如果再次尝试监听的话,那么会报错。

man listen

listen() will fail if:

     [EINVAL]           socket is already connected.

下面的代码则会展示这种场景:

const http = require('node:http');
const process = require('node:process');

const numCPUs = require('node:os').cpus().length;
for (let i = 0; i < numCPUs; i++) {
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

/*
Error: listen EADDRINUSE: address already in use :::8000
    at Server.setupListenHandle [as _listen2] (node:net:1432:16)
    at listenInCluster (node:net:1480:12)
    at Server.listen (node:net:1568:7)
    at Object.<anonymous> (main.js:12:6)
    at Module._compile (node:internal/modules/cjs/loader:1126:14)
    at Object.Module._extensions..js (node:internal/modules/cjs/loader:1180:10)
    at Module.load (node:internal/modules/cjs/loader:1004:32)
    at Function.Module._load (node:internal/modules/cjs/loader:839:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
    at node:internal/main/run_main_module:17:47
Emitted 'error' event on Server instance at:
    at emitErrorNT (node:net:1459:8)
    at processTicksAndRejections (node:internal/process/task_queues:83:21) {
  code: 'EADDRINUSE',
  errno: -48,
  syscall: 'listen',
  address: '::',
  port: 8000
}
*/

要尝试弄懂这个问题只有从 node.js 的源代码入手。

首先从,http.Server.listen 入手。

// lib/net.js

Server.prototype.listen = function(...args) {
 
  let backlog;
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    validatePort(options.port, 'options.port');
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for primary server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }

  if (!(('port' in options) || ('path' in options))) {
    throw new ERR_INVALID_ARG_VALUE('options', options,
                                    'must have the property "port" or "path"');
  }

  throw new ERR_INVALID_ARG_VALUE('options', options);
};

可以看到,在上述的情况下,即 host === undefinedport 为数字或者字符串,listen 函数会直接调用 listenInCluster 来处理端口的监听。事实上,如果 host 不为空的情况下,lookupAndListen 也会调用listenInCluster

// lib/net.js

function lookupAndListen(self, port, address, backlog, exclusive, flags) {
  if (dns === undefined) dns = require('dns');
  dns.lookup(address, function doListen(err, ip, addressType) {
    if (err) {
      self.emit('error', err);
    } else {
      addressType = ip ? addressType : 4;
      listenInCluster(self, ip, port, addressType,
                      backlog, undefined, exclusive, flags);
    }
  });
}

listenInCluster 函数中,首先会判断当前的进程是否是主进程,如果是则直接进行监听,如果不是,则通过子进程查询到主进程的 handle ,然后在主进程的 handle 上进行监听。

// lib/net.js

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags, options) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isPrimary || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
    backlog,
    ...options,
  };
  // Get the primary's server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnPrimaryHandle);

  function listenOnPrimaryHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    // Reuse primary's server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

那么,这里就有一个问题,如果当前的进程没有使用 cluster 那么上述逻辑又是怎么能走得通呢。事实上。cluster.isPrimary 的逻辑如下:

True if the process is a primary. This is determined by the process.env.NODE_UNIQUE_ID. If process.env.NODE_UNIQUE_ID is undefined, then isPrimary is true.

也就是说,如果进程的环境不存在 NODE_UNIQUE_ID 这个变量,那么都算作主进程,那么就会走主进程监听的逻辑了。

NODE_UNIQUE_ID 设置的逻辑如下:

// lib/internal/cluster/primary.js

let ids = 0;

cluster.fork = function(env) {
  cluster.setupPrimary();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
}

function createWorkerProcess(id, env) {
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

那么,工作进程又是怎样找到主进程的 handle 的呢?即 _getServer 的逻辑又是怎样的呢?

// lib/internal/cluster/child.js

function send(message, cb) {
  return sendHelper(process, message, null, cb);
}

cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  if (obj._getServerData)
    message.data = obj._getServerData();

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle) {
      // Shared listen socket
      shared(reply, { handle, indexesKey, index }, cb);
    } else {
      // Round-robin.
      rr(reply, { indexesKey, index }, cb);
    }
  });
};

_getServer 所做的仅仅是向当前进程发送一个类型为 queryServer 的消息。

这个消息会被转化成 cluster 的内部消息。

// lib/internal/cluster/utils.js

function sendHelper(proc, message, handle, cb) {
  if (!proc.connected)
    return false;

  // Mark message as internal. See INTERNAL_PREFIX
  // in lib/internal/child_process.js
  message = { cmd: 'NODE_CLUSTER', ...message, seq };

  if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
  return proc.send(message, handle);
}

这个消息会以 IPC (fork 的时候建立的,用于维持主进程与工作进程之间的通信)的形式传送给主进程。

// lib/internal/child_process.js
  
target.send = function(message, handle, options, callback) {
    // child_process 模块初始化时会被设置为 `true`
    if (this.connected) {
      return this._send(message, handle, options, callback);
    }
    
    return false;
  };

target._send = function(message, handle, options, callback) {
   

    const req = new WriteWrap();

    const err = writeChannelMessage(channel, req, message, handle);
    const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];


    return channel.writeQueueSize < (65536 * 2);
  };

当主进程会创建一个轮询(round-robin)的 handle ,基于此 handle 创建 TCP 连接。

// lib/internal/cluster/primary.js

function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  let handle = handles.get(key);

  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === 'string' &&
        process.platform !== 'win32') {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it's connectionless. There is nothing to send to
    // the workers except raw datagrams and that's pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === 'udp4' ||
        message.addressType === 'udp6') {
      handle = new SharedHandle(key, address, message);
    } else {
      handle = new RoundRobinHandle(key, address, message);
    }

    handles.set(key, handle);
  }

  if (!handle.data)
    handle.data = message.data;

  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}
// lib/internal/cluster/round_robin_handle.js

function RoundRobinHandle(key, address, { port, fd, flags, backlog, readableAll, writableAll }) {
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = init(ObjectCreate(null));
  this.handle = null;
  this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd, backlog });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
      backlog,
    });
  } else
    this.server.listen({
      path: address,
      backlog,
      readableAll,
      writableAll,
    });  // UNIX socket path.
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

这一步也会走上面的 listenInCluster 逻辑,但是由于是在主进程里执行的,因此会直接创建 TCP 连接。同时 RoundRobinHandle 也会覆盖 Server.handleonconnection 逻辑,将其替换成 round-robin 逻辑,即此处的 this.handle.onconnection = (err, handle) => this.distribute(err, handle);

随后,queryServer 讲当前工作进程加入 RoundRobinHandle 的工作队列中。

// lib/internal/cluster/primary.js
  
handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });

这一步完成之后,工作进程会直接返回到 queryServer 的回调之中。

// lib/internal/cluster/round_robin_handle.js
const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

// lib/internal/cluster/child.js
send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle) {
      // Shared listen socket
      shared(reply, { handle, indexesKey, index }, cb);
    } else {
      // Round-robin.
      rr(reply, { indexesKey, index }, cb);
    }
  });

rr 函数中,会创建一个fake handle 并交由 listenOnPrimaryHandle 中进行监听。

// lib/net.js  
function listenOnPrimaryHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    // Reuse primary's server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

这里直接走的 server._listen2 进行监听。

server._listen2 里会直接为这个 handle 设置 onconnection 回调。

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd);

  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
 
  }

  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511);

  if (err) {
    const ex = uvExceptionWithHostPort(err, 'listen', address, port);
    this._handle.close();
    this._handle = null;
    defaultTriggerAsyncIdScope(this[async_id_symbol],
                               process.nextTick,
                               emitErrorNT,
                               this,
                               ex);
    return;
  }

  // Generate connection key, this should be unique to the connection
  this._connectionKey = addressType + ':' + address + ':' + port;

  // Unref the handle if the server was unref'ed prior to listening
  if (this._unref)
    this.unref();

  defaultTriggerAsyncIdScope(this[async_id_symbol],
                             process.nextTick,
                             emitListeningNT,
                             this);
}

这样就完成了主进程的端口监听和工作进程的连接准备工作。

当主进程的 RoundRobinHandle 接收到了 listening 的请求之后,会调用 distribute 函数讲客户端的 handle 传递给工作进程。具体的逻辑为:讲这个 handle 保存到队列中并且从工作进程队列中获取一个空闲的工作进程。如果存在空闲的工作进程,从队列中取出一个 handle 那么则向其发送 act: 'newconn' 的消息将 handle 传递给工作进程 。工作进程会使用此 handle 与客户端建立连接。并发送消息给主进程。主进程通过 accepted 是否为 true 来判断工作进程是否接受了请求,如果是则关闭与客户端的连接,让其与工作进程进行通信。最后主进程会不断地轮询上述过程。

类似的模型 C 语言实现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define PORT 4444

int main(){

    int sockfd, ret;
     struct sockaddr_in serverAddr;

    int newSocket;
    struct sockaddr_in newAddr;

    socklen_t addr_size;

    char buffer[1024];
    pid_t childpid;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        printf("[-]Error in connection.\n");
        exit(1);
    }
    printf("[+]Server Socket is created.\n");

    memset(&serverAddr, '\0', sizeof(serverAddr));
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(PORT);
    serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    ret = bind(sockfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
    if(ret < 0){
        printf("[-]Error in binding.\n");
        exit(1);
    }
    printf("[+]Bind to port %d\n", 4444);

    if(listen(sockfd, 10) == 0){
        printf("[+]Listening....\n");
    }else{
        printf("[-]Error in binding.\n");
    }


    while(1){
        newSocket = accept(sockfd, (struct sockaddr*)&newAddr, &addr_size);
        if(newSocket < 0){
            exit(1);
        }
        printf("Connection accepted from %s:%d\n", inet_ntoa(newAddr.sin_addr), ntohs(newAddr.sin_port));

        if((childpid = fork()) == 0){
            close(sockfd);

            while(1){
                recv(newSocket, buffer, 1024, 0);
                if(strcmp(buffer, ":exit") == 0){
                    printf("Disconnected from %s:%d\n", inet_ntoa(newAddr.sin_addr), ntohs(newAddr.sin_port));
                    break;
                }else{
                    printf("Client: %s\n", buffer);
                    send(newSocket, buffer, strlen(buffer), 0);
                    bzero(buffer, sizeof(buffer));
                }
            }
        }

    }

    close(newSocket);


    return 0;
}
Node.js

作者: Cheng

2025 © typecho & elise & Cheng