Skip to content

Commit bd21038

Browse files
committed
Merge remote-tracking branch 'ry/master' into merge-v0.6
2 parents 18d179c + db3c4ef commit bd21038

File tree

4 files changed

+165
-48
lines changed

4 files changed

+165
-48
lines changed

lib/child_process.js

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,29 +68,13 @@ function mergeOptions(target, overrides) {
6868

6969

7070
function setupChannel(target, channel) {
71-
var isWindows = process.platform === 'win32';
7271
target._channel = channel;
7372

7473
var jsonBuffer = '';
75-
76-
if (isWindows) {
77-
var setSimultaneousAccepts = function(handle) {
78-
var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
79-
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
80-
81-
if (handle._simultaneousAccepts != simultaneousAccepts) {
82-
handle.setSimultaneousAccepts(simultaneousAccepts);
83-
handle._simultaneousAccepts = simultaneousAccepts;
84-
}
85-
}
86-
}
87-
8874
channel.buffering = false;
8975
channel.onread = function(pool, offset, length, recvHandle) {
90-
if (recvHandle && setSimultaneousAccepts) {
91-
// Update simultaneous accepts on Windows
92-
setSimultaneousAccepts(recvHandle);
93-
}
76+
// Update simultaneous accepts on Windows
77+
net._setSimultaneousAccepts(recvHandle);
9478

9579
if (pool) {
9680
jsonBuffer += pool.toString('ascii', offset, offset + length);
@@ -140,10 +124,8 @@ function setupChannel(target, channel) {
140124

141125
var buffer = Buffer(JSON.stringify(message) + '\n');
142126

143-
if (sendHandle && setSimultaneousAccepts) {
144-
// Update simultaneous accepts on Windows
145-
setSimultaneousAccepts(sendHandle);
146-
}
127+
// Update simultaneous accepts on Windows
128+
net._setSimultaneousAccepts(sendHandle);
147129

148130
var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);
149131

@@ -582,9 +564,13 @@ Isolate.prototype.spawn = function(options) {
582564
self._handle = isolates.create(options.args, options.options);
583565
if (!self._handle) throw new Error('Cannot create isolate.');
584566

585-
self._handle.onmessage = function(msg) {
567+
self._handle.onmessage = function(msg, recvHandle) {
586568
msg = JSON.parse('' + msg);
587-
self.emit('message', msg);
569+
570+
// Update simultaneous accepts on Windows
571+
net._setSimultaneousAccepts(recvHandle);
572+
573+
self.emit('message', msg, recvHandle);
588574
};
589575

590576
self._handle.onexit = function() {
@@ -600,10 +586,14 @@ Isolate.prototype.kill = function(sig) {
600586
};
601587

602588

603-
Isolate.prototype.send = function(msg) {
589+
Isolate.prototype.send = function(msg, sendHandle) {
604590
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
605591
if (!this._handle) throw new Error('Isolate not running.');
606592
msg = JSON.stringify(msg);
607593
msg = new Buffer(msg);
608-
return this._handle.send(msg);
594+
595+
// Update simultaneous accepts on Windows
596+
net._setSimultaneousAccepts(sendHandle);
597+
598+
return this._handle.send(msg, sendHandle);
609599
};

lib/net.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,3 +942,26 @@ exports.isIPv4 = function(input) {
942942
exports.isIPv6 = function(input) {
943943
return exports.isIP(input) === 6;
944944
};
945+
946+
947+
if (process.platform === 'win32') {
948+
var simultaneousAccepts;
949+
950+
exports._setSimultaneousAccepts = function(handle) {
951+
if (typeof handle === 'undefined') {
952+
return;
953+
}
954+
955+
if (typeof simultaneousAccepts === 'undefined') {
956+
simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
957+
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
958+
}
959+
960+
if (handle._simultaneousAccepts != simultaneousAccepts) {
961+
handle.setSimultaneousAccepts(simultaneousAccepts);
962+
handle._simultaneousAccepts = simultaneousAccepts;
963+
}
964+
}
965+
} else {
966+
exports._setSimultaneousAccepts = function(handle) {}
967+
}

src/node.js

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,27 @@
123123

124124
if (process.tid === 1) return;
125125

126+
var net = NativeModule.require('net');
127+
126128
// isolate initialization
127-
process.send = function(msg) {
129+
process.send = function(msg, sendHandle) {
128130
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
129131
msg = JSON.stringify(msg);
130132
msg = new Buffer(msg);
131-
return process._send(msg);
133+
134+
// Update simultaneous accepts on Windows
135+
net._setSimultaneousAccepts(sendHandle);
136+
137+
return process._send(msg, sendHandle);
132138
};
133139

134-
process._onmessage = function(msg) {
140+
process._onmessage = function(msg, recvHandle) {
135141
msg = JSON.parse('' + msg);
136-
process.emit('message', msg);
142+
143+
// Update simultaneous accepts on Windows
144+
net._setSimultaneousAccepts(recvHandle);
145+
146+
process.emit('message', msg, recvHandle);
137147
};
138148

139149
process.exit = process._exit;
@@ -441,10 +451,15 @@
441451
// Load tcp_wrap to avoid situation where we might immediately receive
442452
// a message.
443453
// FIXME is this really necessary?
444-
process.binding('tcp_wrap')
454+
process.binding('tcp_wrap');
445455

446456
cp._forkChild();
447457
assert(process.send);
458+
} else if (process.tid !== 1) {
459+
// Load tcp_wrap to avoid situation where we might immediately receive
460+
// a message.
461+
// FIXME is this really necessary?
462+
process.binding('tcp_wrap');
448463
}
449464
}
450465

src/node_isolate.cc

Lines changed: 106 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <node_isolate.h>
2727
#include <node_internals.h>
2828
#include <node_object_wrap.h>
29+
#include <tcp_wrap.h>
2930

3031
#include <stdlib.h>
3132
#include <string.h>
@@ -34,6 +35,8 @@
3435

3536
#define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor)
3637

38+
#define ISOLATEMESSAGE_SHARED_STREAM 0x0001
39+
3740

3841
namespace node {
3942

@@ -166,23 +169,35 @@ class Channel {
166169

167170

168171
struct IsolateMessage {
169-
size_t size_;
170-
char* data_;
172+
int flags;
173+
struct {
174+
size_t size_;
175+
char* buffer_;
176+
} data_;
177+
uv_stream_info_t shared_stream_info_;
178+
179+
IsolateMessage(const char* buffer, size_t size,
180+
uv_stream_info_t* shared_stream_info) {
181+
flags = 0;
171182

172-
IsolateMessage(const char* data, size_t size) {
173183
// make a copy for now
174-
size_ = size;
175-
data_ = new char[size];
176-
memcpy(data_, data, size);
184+
data_.size_ = size;
185+
data_.buffer_ = new char[size];
186+
memcpy(data_.buffer_, buffer, size);
187+
188+
if (shared_stream_info) {
189+
flags |= ISOLATEMESSAGE_SHARED_STREAM;
190+
shared_stream_info_ = *shared_stream_info;
191+
}
177192
}
178193

179194
~IsolateMessage() {
180-
delete[] data_;
195+
delete[] data_.buffer_;
181196
}
182197

183198
static void Free(char* data, void* arg) {
184199
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
185-
assert(data == msg->data_);
200+
assert(data == msg->data_.buffer_);
186201
delete msg;
187202
}
188203
};
@@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) {
208223
const char* data = Buffer::Data(obj);
209224
size_t size = Buffer::Length(obj);
210225

211-
IsolateMessage* msg = new IsolateMessage(data, size);
226+
IsolateMessage* msg;
227+
228+
if (args[1]->IsObject()) {
229+
uv_stream_info_t stream_info;
230+
231+
Local<Object> send_stream_obj = args[1]->ToObject();
232+
assert(send_stream_obj->InternalFieldCount() > 0);
233+
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
234+
send_stream_obj->GetPointerFromInternalField(0));
235+
uv_stream_t* send_stream = send_stream_wrap->GetStream();
236+
int r = uv_export(send_stream, &stream_info);
237+
assert(r == 0);
238+
msg = new IsolateMessage(data, size, &stream_info);
239+
} else {
240+
msg = new IsolateMessage(data, size, NULL);
241+
}
242+
212243
isolate->send_channel_->Send(msg);
213244

214245
return Undefined();
@@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
231262
Isolate* self = static_cast<Isolate*>(arg);
232263
NODE_ISOLATE_CHECK(self);
233264

234-
Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
235-
Handle<Value> argv[] = { buf->handle_ };
236-
MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
265+
Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_,
266+
IsolateMessage::Free, msg);
267+
268+
int argc = 1;
269+
Handle<Value> argv[2] = {
270+
buf->handle_
271+
};
272+
273+
if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
274+
// Instantiate the client javascript object and handle.
275+
Local<Object> pending_obj = TCPWrap::Instantiate();
276+
277+
// Unwrap the client javascript object.
278+
assert(pending_obj->InternalFieldCount() > 0);
279+
TCPWrap* pending_wrap =
280+
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
281+
282+
int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
283+
assert(r == 0);
284+
285+
argv[1] = pending_obj;
286+
argc++;
287+
}
288+
289+
MakeCallback(self->globals_.process, "_onmessage", argc, argv);
237290
}
238291

239292

@@ -442,9 +495,30 @@ struct IsolateWrap: public ObjectWrap {
442495
NODE_ISOLATE_CHECK(parent_isolate_);
443496
HandleScope scope;
444497
Buffer* buf = Buffer::New(
445-
msg->data_, msg->size_, IsolateMessage::Free, msg);
446-
Handle<Value> argv[] = { buf->handle_ };
447-
MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
498+
msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg);
499+
500+
int argc = 1;
501+
Handle<Value> argv[2] = {
502+
buf->handle_
503+
};
504+
505+
if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
506+
// Instantiate the client javascript object and handle.
507+
Local<Object> pending_obj = TCPWrap::Instantiate();
508+
509+
// Unwrap the client javascript object.
510+
assert(pending_obj->InternalFieldCount() > 0);
511+
TCPWrap* pending_wrap =
512+
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
513+
514+
int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
515+
assert(r == 0);
516+
517+
argv[1] = pending_obj;
518+
argc++;
519+
}
520+
521+
MakeCallback(handle_, "onmessage", argc, argv);
448522
}
449523

450524
// TODO merge with Isolate::Send(), it's almost identical
@@ -457,9 +531,24 @@ struct IsolateWrap: public ObjectWrap {
457531
const char* data = Buffer::Data(obj);
458532
size_t size = Buffer::Length(obj);
459533

460-
IsolateMessage* msg = new IsolateMessage(data, size);
461-
self->send_channel_->Send(msg);
534+
IsolateMessage* msg;
535+
536+
if (args[1]->IsObject()) {
537+
uv_stream_info_t stream_info;
538+
539+
Local<Object> send_stream_obj = args[1]->ToObject();
540+
assert(send_stream_obj->InternalFieldCount() > 0);
541+
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
542+
send_stream_obj->GetPointerFromInternalField(0));
543+
uv_stream_t* send_stream = send_stream_wrap->GetStream();
544+
int r = uv_export(send_stream, &stream_info);
545+
assert(r == 0);
546+
msg = new IsolateMessage(data, size, &stream_info);
547+
} else {
548+
msg = new IsolateMessage(data, size, NULL);
549+
}
462550

551+
self->send_channel_->Send(msg);
463552
return Undefined();
464553
}
465554

0 commit comments

Comments
 (0)