Node.js 官方文档上边的例子:
const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').cpus().length;
const process = require('node:process');
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
众所周知,当一个端口被监听的时候如果如果再次尝试监听的话,那么会报错。
man listen
listen() will fail if:
[EINVAL] socket is already connected.
下面的代码则会展示这种场景:
const http = require('node:http');
const process = require('node:process');
const numCPUs = require('node:os').cpus().length;
for (let i = 0; i < numCPUs; i++) {
http.Server((req, res) => {
res.writeHead(200);
res.end('hello world\n');
// Notify primary about the request
process.send({ cmd: 'notifyRequest' });
}).listen(8000);
}
/*
Error: listen EADDRINUSE: address already in use :::8000
at Server.setupListenHandle [as _listen2] (node:net:1432:16)
at listenInCluster (node:net:1480:12)
at Server.listen (node:net:1568:7)
at Object.<anonymous> (main.js:12:6)
at Module._compile (node:internal/modules/cjs/loader:1126:14)
at Object.Module._extensions..js (node:internal/modules/cjs/loader:1180:10)
at Module.load (node:internal/modules/cjs/loader:1004:32)
at Function.Module._load (node:internal/modules/cjs/loader:839:12)
at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
at node:internal/main/run_main_module:17:47
Emitted 'error' event on Server instance at:
at emitErrorNT (node:net:1459:8)
at processTicksAndRejections (node:internal/process/task_queues:83:21) {
code: 'EADDRINUSE',
errno: -48,
syscall: 'listen',
address: '::',
port: 8000
}
*/
要尝试弄懂这个问题只有从 node.js 的源代码入手。
首先从,http.Server.listen
入手。
// lib/net.js
Server.prototype.listen = function(...args) {
let backlog;
if (typeof options.port === 'number' || typeof options.port === 'string') {
validatePort(options.port, 'options.port');
backlog = options.backlog || backlogFromArgs;
// start TCP server listening on host:port
if (options.host) {
lookupAndListen(this, options.port | 0, options.host, backlog,
options.exclusive, flags);
} else { // Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for primary server
listenInCluster(this, null, options.port | 0, 4,
backlog, undefined, options.exclusive);
}
return this;
}
if (!(('port' in options) || ('path' in options))) {
throw new ERR_INVALID_ARG_VALUE('options', options,
'must have the property "port" or "path"');
}
throw new ERR_INVALID_ARG_VALUE('options', options);
};
可以看到,在上述的情况下,即 host === undefined
且port
为数字或者字符串,listen
函数会直接调用 listenInCluster
来处理端口的监听。事实上,如果 host
不为空的情况下,lookupAndListen
也会调用listenInCluster
。
// lib/net.js
function lookupAndListen(self, port, address, backlog, exclusive, flags) {
if (dns === undefined) dns = require('dns');
dns.lookup(address, function doListen(err, ip, addressType) {
if (err) {
self.emit('error', err);
} else {
addressType = ip ? addressType : 4;
listenInCluster(self, ip, port, addressType,
backlog, undefined, exclusive, flags);
}
});
}
在 listenInCluster
函数中,首先会判断当前的进程是否是主进程,如果是则直接进行监听,如果不是,则通过子进程查询到主进程的 handle
,然后在主进程的 handle
上进行监听。
// lib/net.js
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive, flags, options) {
exclusive = !!exclusive;
if (cluster === undefined) cluster = require('cluster');
if (cluster.isPrimary || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
backlog,
...options,
};
// Get the primary's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnPrimaryHandle);
function listenOnPrimaryHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
// Reuse primary's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
那么,这里就有一个问题,如果当前的进程没有使用 cluster
那么上述逻辑又是怎么能走得通呢。事实上。cluster.isPrimary
的逻辑如下:
True if the process is a primary. This is determined by theprocess.env.NODE_UNIQUE_ID
. Ifprocess.env.NODE_UNIQUE_ID
is undefined, thenisPrimary
istrue
.
也就是说,如果进程的环境不存在 NODE_UNIQUE_ID
这个变量,那么都算作主进程,那么就会走主进程监听的逻辑了。
NODE_UNIQUE_ID
设置的逻辑如下:
// lib/internal/cluster/primary.js
let ids = 0;
cluster.fork = function(env) {
cluster.setupPrimary();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
}
function createWorkerProcess(id, env) {
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
那么,工作进程又是怎样找到主进程的 handle
的呢?即 _getServer
的逻辑又是怎样的呢?
// lib/internal/cluster/child.js
function send(message, cb) {
return sendHelper(process, message, null, cb);
}
cluster._getServer = function(obj, options, cb) {
let address = options.address;
const message = {
act: 'queryServer',
index,
data: null,
...options
};
if (obj._getServerData)
message.data = obj._getServerData();
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle) {
// Shared listen socket
shared(reply, { handle, indexesKey, index }, cb);
} else {
// Round-robin.
rr(reply, { indexesKey, index }, cb);
}
});
};
_getServer
所做的仅仅是向当前进程发送一个类型为 queryServer 的消息。
这个消息会被转化成 cluster
的内部消息。
// lib/internal/cluster/utils.js
function sendHelper(proc, message, handle, cb) {
if (!proc.connected)
return false;
// Mark message as internal. See INTERNAL_PREFIX
// in lib/internal/child_process.js
message = { cmd: 'NODE_CLUSTER', ...message, seq };
if (typeof cb === 'function')
callbacks.set(seq, cb);
seq += 1;
return proc.send(message, handle);
}
这个消息会以 IPC
(fork
的时候建立的,用于维持主进程与工作进程之间的通信)的形式传送给主进程。
// lib/internal/child_process.js
target.send = function(message, handle, options, callback) {
// child_process 模块初始化时会被设置为 `true`
if (this.connected) {
return this._send(message, handle, options, callback);
}
return false;
};
target._send = function(message, handle, options, callback) {
const req = new WriteWrap();
const err = writeChannelMessage(channel, req, message, handle);
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
return channel.writeQueueSize < (65536 * 2);
};
当主进程会创建一个轮询(round-robin)的 handle
,基于此 handle
创建 TCP 连接。
// lib/internal/cluster/primary.js
function queryServer(worker, message) {
// Stop processing if worker already disconnecting
if (worker.exitedAfterDisconnect)
return;
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
let handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
// Find shortest path for unix sockets because of the ~100 byte limit
if (message.port < 0 && typeof address === 'string' &&
process.platform !== 'win32') {
address = path.relative(process.cwd(), address);
if (message.address.length < address.length)
address = message.address;
}
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
handle = new SharedHandle(key, address, message);
} else {
handle = new RoundRobinHandle(key, address, message);
}
handles.set(key, handle);
}
if (!handle.data)
handle.data = message.data;
// Set custom server data
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags, backlog, readableAll, writableAll }) {
this.key = key;
this.all = new SafeMap();
this.free = new SafeMap();
this.handles = init(ObjectCreate(null));
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd, backlog });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
backlog,
});
} else
this.server.listen({
path: address,
backlog,
readableAll,
writableAll,
}); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
这一步也会走上面的 listenInCluster
逻辑,但是由于是在主进程里执行的,因此会直接创建 TCP 连接。同时 RoundRobinHandle
也会覆盖 Server.handle
的 onconnection
逻辑,将其替换成 round-robin
逻辑,即此处的 this.handle.onconnection = (err, handle) => this.distribute(err, handle);
。
随后,queryServer
讲当前工作进程加入 RoundRobinHandle
的工作队列中。
// lib/internal/cluster/primary.js
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
这一步完成之后,工作进程会直接返回到 queryServer
的回调之中。
// lib/internal/cluster/round_robin_handle.js
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
// lib/internal/cluster/child.js
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle) {
// Shared listen socket
shared(reply, { handle, indexesKey, index }, cb);
} else {
// Round-robin.
rr(reply, { indexesKey, index }, cb);
}
});
在 rr
函数中,会创建一个fake handle
并交由 listenOnPrimaryHandle
中进行监听。
// lib/net.js
function listenOnPrimaryHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
// Reuse primary's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
这里直接走的 server._listen2
进行监听。
在 server._listen2
里会直接为这个 handle
设置 onconnection
回调。
// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
debug('setupListenHandle', address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (this._handle) {
debug('setupListenHandle: have a handle already');
} else {
}
this._handle.onconnection = onconnection;
this._handle[owner_symbol] = this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because
// the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
// which will thus give us a backlog of 512 entries.
const err = this._handle.listen(backlog || 511);
if (err) {
const ex = uvExceptionWithHostPort(err, 'listen', address, port);
this._handle.close();
this._handle = null;
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
emitErrorNT,
this,
ex);
return;
}
// Generate connection key, this should be unique to the connection
this._connectionKey = addressType + ':' + address + ':' + port;
// Unref the handle if the server was unref'ed prior to listening
if (this._unref)
this.unref();
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
emitListeningNT,
this);
}
这样就完成了主进程的端口监听和工作进程的连接准备工作。
当主进程的 RoundRobinHandle
接收到了 listening
的请求之后,会调用 distribute
函数讲客户端的 handle
传递给工作进程。具体的逻辑为:讲这个 handle
保存到队列中并且从工作进程队列中获取一个空闲的工作进程。如果存在空闲的工作进程,从队列中取出一个 handle
那么则向其发送 act: 'newconn'
的消息将 handle
传递给工作进程 。工作进程会使用此 handle
与客户端建立连接。并发送消息给主进程。主进程通过 accepted
是否为 true
来判断工作进程是否接受了请求,如果是则关闭与客户端的连接,让其与工作进程进行通信。最后主进程会不断地轮询上述过程。
类似的模型 C 语言实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define PORT 4444
int main(){
int sockfd, ret;
struct sockaddr_in serverAddr;
int newSocket;
struct sockaddr_in newAddr;
socklen_t addr_size;
char buffer[1024];
pid_t childpid;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0){
printf("[-]Error in connection.\n");
exit(1);
}
printf("[+]Server Socket is created.\n");
memset(&serverAddr, '\0', sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(PORT);
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
ret = bind(sockfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
if(ret < 0){
printf("[-]Error in binding.\n");
exit(1);
}
printf("[+]Bind to port %d\n", 4444);
if(listen(sockfd, 10) == 0){
printf("[+]Listening....\n");
}else{
printf("[-]Error in binding.\n");
}
while(1){
newSocket = accept(sockfd, (struct sockaddr*)&newAddr, &addr_size);
if(newSocket < 0){
exit(1);
}
printf("Connection accepted from %s:%d\n", inet_ntoa(newAddr.sin_addr), ntohs(newAddr.sin_port));
if((childpid = fork()) == 0){
close(sockfd);
while(1){
recv(newSocket, buffer, 1024, 0);
if(strcmp(buffer, ":exit") == 0){
printf("Disconnected from %s:%d\n", inet_ntoa(newAddr.sin_addr), ntohs(newAddr.sin_port));
break;
}else{
printf("Client: %s\n", buffer);
send(newSocket, buffer, strlen(buffer), 0);
bzero(buffer, sizeof(buffer));
}
}
}
}
close(newSocket);
return 0;
}