Skip to content

Commit d751afa

Browse files
cjihrigitaloacasas
authored andcommitted
cluster: refactor module into multiple files
This commit splits the existing cluster module into several internal modules. More specifically, the cluster master and worker implementations are separated, and the various data structures are separated. PR-URL: #10746 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Sam Roberts <[email protected]>
1 parent 4a7bb5b commit d751afa

File tree

10 files changed

+871
-772
lines changed

10 files changed

+871
-772
lines changed

lib/cluster.js

+3-766
Large diffs are not rendered by default.

lib/internal/cluster.js

-4
This file was deleted.

lib/internal/cluster/child.js

+224
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
'use strict';
2+
const assert = require('assert');
3+
const util = require('util');
4+
const EventEmitter = require('events');
5+
const Worker = require('internal/cluster/worker');
6+
const { internal, sendHelper } = require('internal/cluster/utils');
7+
const cluster = new EventEmitter();
8+
const handles = {};
9+
const indexes = {};
10+
const noop = () => {};
11+
12+
module.exports = cluster;
13+
14+
cluster.isWorker = true;
15+
cluster.isMaster = false;
16+
cluster.worker = null;
17+
cluster.Worker = Worker;
18+
19+
cluster._setupWorker = function() {
20+
const worker = new Worker({
21+
id: +process.env.NODE_UNIQUE_ID | 0,
22+
process: process,
23+
state: 'online'
24+
});
25+
26+
cluster.worker = worker;
27+
28+
process.once('disconnect', () => {
29+
worker.emit('disconnect');
30+
31+
if (!worker.exitedAfterDisconnect) {
32+
// Unexpected disconnect, master exited, or some such nastiness, so
33+
// worker exits immediately.
34+
process.exit(0);
35+
}
36+
});
37+
38+
process.on('internalMessage', internal(worker, onmessage));
39+
send({ act: 'online' });
40+
41+
function onmessage(message, handle) {
42+
if (message.act === 'newconn')
43+
onconnection(message, handle);
44+
else if (message.act === 'disconnect')
45+
_disconnect.call(worker, true);
46+
}
47+
};
48+
49+
// obj is a net#Server or a dgram#Socket object.
50+
cluster._getServer = function(obj, options, cb) {
51+
const indexesKey = [options.address,
52+
options.port,
53+
options.addressType,
54+
options.fd ].join(':');
55+
56+
if (indexes[indexesKey] === undefined)
57+
indexes[indexesKey] = 0;
58+
else
59+
indexes[indexesKey]++;
60+
61+
const message = util._extend({
62+
act: 'queryServer',
63+
index: indexes[indexesKey],
64+
data: null
65+
}, options);
66+
67+
// Set custom data on handle (i.e. tls tickets key)
68+
if (obj._getServerData)
69+
message.data = obj._getServerData();
70+
71+
send(message, (reply, handle) => {
72+
if (typeof obj._setServerData === 'function')
73+
obj._setServerData(reply.data);
74+
75+
if (handle)
76+
shared(reply, handle, indexesKey, cb); // Shared listen socket.
77+
else
78+
rr(reply, indexesKey, cb); // Round-robin.
79+
});
80+
81+
obj.once('listening', () => {
82+
cluster.worker.state = 'listening';
83+
const address = obj.address();
84+
message.act = 'listening';
85+
message.port = address && address.port || options.port;
86+
send(message);
87+
});
88+
};
89+
90+
// Shared listen socket.
91+
function shared(message, handle, indexesKey, cb) {
92+
const key = message.key;
93+
// Monkey-patch the close() method so we can keep track of when it's
94+
// closed. Avoids resource leaks when the handle is short-lived.
95+
const close = handle.close;
96+
97+
handle.close = function() {
98+
send({ act: 'close', key });
99+
delete handles[key];
100+
delete indexes[indexesKey];
101+
return close.apply(this, arguments);
102+
};
103+
assert(handles[key] === undefined);
104+
handles[key] = handle;
105+
cb(message.errno, handle);
106+
}
107+
108+
// Round-robin. Master distributes handles across workers.
109+
function rr(message, indexesKey, cb) {
110+
if (message.errno)
111+
return cb(message.errno, null);
112+
113+
var key = message.key;
114+
115+
function listen(backlog) {
116+
// TODO(bnoordhuis) Send a message to the master that tells it to
117+
// update the backlog size. The actual backlog should probably be
118+
// the largest requested size by any worker.
119+
return 0;
120+
}
121+
122+
function close() {
123+
// lib/net.js treats server._handle.close() as effectively synchronous.
124+
// That means there is a time window between the call to close() and
125+
// the ack by the master process in which we can still receive handles.
126+
// onconnection() below handles that by sending those handles back to
127+
// the master.
128+
if (key === undefined)
129+
return;
130+
131+
send({ act: 'close', key });
132+
delete handles[key];
133+
delete indexes[indexesKey];
134+
key = undefined;
135+
}
136+
137+
function getsockname(out) {
138+
if (key)
139+
util._extend(out, message.sockname);
140+
141+
return 0;
142+
}
143+
144+
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
145+
// with it. Fools net.Server into thinking that it's backed by a real
146+
// handle. Use a noop function for ref() and unref() because the control
147+
// channel is going to keep the worker alive anyway.
148+
const handle = { close, listen, ref: noop, unref: noop };
149+
150+
if (message.sockname) {
151+
handle.getsockname = getsockname; // TCP handles only.
152+
}
153+
154+
assert(handles[key] === undefined);
155+
handles[key] = handle;
156+
cb(0, handle);
157+
}
158+
159+
// Round-robin connection.
160+
function onconnection(message, handle) {
161+
const key = message.key;
162+
const server = handles[key];
163+
const accepted = server !== undefined;
164+
165+
send({ ack: message.seq, accepted });
166+
167+
if (accepted)
168+
server.onconnection(0, handle);
169+
}
170+
171+
function send(message, cb) {
172+
return sendHelper(process, message, null, cb);
173+
}
174+
175+
function _disconnect(masterInitiated) {
176+
this.exitedAfterDisconnect = true;
177+
let waitingCount = 1;
178+
179+
function checkWaitingCount() {
180+
waitingCount--;
181+
182+
if (waitingCount === 0) {
183+
// If disconnect is worker initiated, wait for ack to be sure
184+
// exitedAfterDisconnect is properly set in the master, otherwise, if
185+
// it's master initiated there's no need to send the
186+
// exitedAfterDisconnect message
187+
if (masterInitiated) {
188+
process.disconnect();
189+
} else {
190+
send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
191+
}
192+
}
193+
}
194+
195+
for (const key in handles) {
196+
const handle = handles[key];
197+
delete handles[key];
198+
waitingCount++;
199+
200+
if (handle.owner)
201+
handle.owner.close(checkWaitingCount);
202+
else
203+
handle.close(checkWaitingCount);
204+
}
205+
206+
checkWaitingCount();
207+
}
208+
209+
// Extend generic Worker with methods specific to worker processes.
210+
Worker.prototype.disconnect = function() {
211+
_disconnect.call(this);
212+
return this;
213+
};
214+
215+
Worker.prototype.destroy = function() {
216+
this.exitedAfterDisconnect = true;
217+
218+
if (!this.isConnected()) {
219+
process.exit(0);
220+
} else {
221+
send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
222+
process.once('disconnect', () => process.exit(0));
223+
}
224+
};

0 commit comments

Comments
 (0)