Skip to content
This repository was archived by the owner on Mar 30, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions lib/webworker-child.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,11 @@ var assert = require('assert');
var fs = require('fs');
var net = require('net');
var path = require('path');
var script = process.binding('evals');
var vm = require('vm');
var sys = require('sys');
var wwutil = require('./webworker-util');
var WebSocketServer = require('./ws').WebSocketServer;

try {
var WebSocket = require('websocket-client').WebSocket;
} catch (e) {
throw new Error(
'pgriess/node-websocket-client must be installed'
);
}

var writeError = process.binding('stdio').writeError;
var WebSocket = require('ws');

// Catch exceptions
//
Expand Down Expand Up @@ -112,7 +103,7 @@ var sockPath = process.argv[2];
var scriptLoc = new wwutil.WorkerLocation(process.argv[3]);

// Connect to the parent process
var ws = new WebSocket('ws+unix://' + sockPath);
var ws = new WebSocket('ws+unix://' +sockPath);
var ms = new wwutil.MsgStream(ws);

// Once we connect successfully, set up the rest of the world
Expand All @@ -132,14 +123,14 @@ ws.addListener('open', function() {
var scriptObj = undefined;
switch (scriptLoc.protocol) {
case 'file':
scriptObj = new script.Script(
scriptObj = new vm.createScript(
fs.readFileSync(scriptLoc.pathname),
scriptLoc.href
);
break;

default:
writeError('Cannot load script from unknown protocol \'' +
console.error('Cannot load script from unknown protocol \'' +
scriptLoc.protocol);
process.exit(1);
}
Expand Down
6 changes: 3 additions & 3 deletions lib/webworker-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ var MsgStream = function(s) {
var ms = getMsgObj(v, fd);
debug('Process ' + process.pid + ' sending message: ' + sys.inspect(ms));

s.write(JSON.stringify(ms), fd)
s.send(JSON.stringify(ms), fd)
};

s.addListener('message', function(ms) {
s.on('message', function(ms) {
debug('Process ' + process.pid + ' received message: ' + ms);

var mo = JSON.parse(ms);
Expand Down Expand Up @@ -93,7 +93,7 @@ var MsgStream = function(s) {
self.emit('msg', msg, fd);
});

s.addListener('fd', function(fd) {
s.on('fd', function(fd) {
// Look for a message that's waiting for our arrival. If we don't
// have one, enqueu the received FD for later delivery.
var msg = msg_waiting_for_fd[++fds_seqno_recvd];
Expand Down
76 changes: 50 additions & 26 deletions lib/webworker.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// WebWorkers implementation.
//
// The master and workers communite over a UNIX domain socket at
//
//
// /tmp/node-webworker-<master PID>.sock
//
// This socket is used as a full-duplex channel for exchanging messages.
Expand Down Expand Up @@ -32,19 +32,12 @@ var assert = require('assert');
var child_process = require('child_process');
var fs = require('fs');
var net = require('net');
var netBinding = process.binding('net');
var http = require('http');
var path = require('path');
var sys = require('sys');
var wwutil = require('./webworker-util');
var WebSocketServer = require('./ws').Server;

try {
var WebSocket = require('websocket-client').WebSocket;
} catch (e) {
throw new Error(
'pgriess/node-websocket-client must be installed'
);
}
var WebSocketServer = require('ws').Server;
var inspect = require('eyes').inspector({styles: {all: 'magenta'}});

// Directory for our UNIX domain sockets
var SOCK_DIR_PATH = '/tmp/node-webworker-' + process.pid;
Expand All @@ -64,7 +57,7 @@ var Worker = function(src, opts) {
// The timeout ID for killing off this worker if it is unresponsive to a
// graceful shutdown request
var killTimeoutID = undefined;

// Process ID of child process running this worker
//
// This value persists even once the child process itself has
Expand Down Expand Up @@ -92,17 +85,24 @@ var Worker = function(src, opts) {
// The path to our socket
var sockPath = path.join(SOCK_DIR_PATH, '' + numWorkersCreated++);

// Make sure our socket folder is in place (it may have been removed by a previous clean-up)
try {
fs.mkdirSync(SOCK_DIR_PATH, 0700);
} catch(e) {}

// Server instance for our communication socket with the child process
//
// Doesn't begin listening until start() is called.
var wsSrv = new WebSocketServer();

var httpServer = http.createServer();
var wsSrv = new WebSocketServer({ server: httpServer });
wsSrv.addListener('connection', function(s) {
assert.equal(stream, undefined);
assert.equal(msgStream, undefined);

stream = s._req.socket;
stream = s;
msgStream = new wwutil.MsgStream(s);

// Process any messages waiting to be sent
msgQueue.forEach(function(m) {
var fd = m.pop();
Expand All @@ -120,7 +120,8 @@ var Worker = function(src, opts) {
// First fires up the UNIX socket server, then spawns the child process
// and away we go.
var start = function() {
wsSrv.addListener('listening', function() {
httpServer.listen(sockPath);
httpServer.addListener('listening', function() {
var execPath = opts.path || process.execPath || process.argv[0];

var args = [
Expand All @@ -140,9 +141,7 @@ var Worker = function(src, opts) {

cp = child_process.spawn(
execPath,
args,
undefined,
[0, 1, 2]
args
);

// Save off the PID of the child process, as this value gets
Expand All @@ -153,10 +152,15 @@ var Worker = function(src, opts) {
'Spawned process ' + pid + ' for worker \'' + src + '\': ' +
execPath + ' ' + args.join(' ')
);

cp.stdout.on('data', function(d) {
process.stdout.write(d);
})
cp.stderr.on('data', function(d) {
process.stderr.write(d);
})
cp.addListener('exit', function(code, signal) {
wwutil.debug(
'Process ' + pid + ' for worker \'' + src +
'Process ' + pid + ' for worker \'' + src +
'\' exited with status ' + code +', signal ' + signal
);

Expand All @@ -167,14 +171,31 @@ var Worker = function(src, opts) {
}

if (stream) {
stream.destroy();
stream.close();
} else {
wwutil.debug(
'Process ' + pid + ' exited without completing handshaking'
);
}

wsSrv.close();
httpServer.close();
if (msgStream)
msgStream = null;

// remove the socket
fs.unlink(sockPath, function(e) {
try {
// try removing the socket directory
fs.rmdirSync(path.dirname(sockPath));
} catch(e) {}

if (self.onexit) {
process.nextTick(function() {
self.onexit(code, signal);
});
}
});

if (self.onexit) {
process.nextTick(function() {
Expand All @@ -183,8 +204,6 @@ var Worker = function(src, opts) {
}
});
});

wsSrv.listen(sockPath);
};

// The primary message handling function for the worker.
Expand Down Expand Up @@ -286,7 +305,7 @@ var Worker = function(src, opts) {
}

wwutil.debug(
'Forcibily terminating worker process ' + pid +
'Forcibily terminating worker process ' + pid +
' with SIGTERM'
);

Expand All @@ -301,4 +320,9 @@ var Worker = function(src, opts) {
exports.Worker = Worker;

// Perform any one-time initialization
fs.mkdirSync(SOCK_DIR_PATH, 0700);
try {
fs.mkdirSync(SOCK_DIR_PATH, 0700);
} catch(e) {
if (e.code && e.code != 'EEXIST')
throw e;
}
3 changes: 0 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
"engines" : {
"node" : ">=0.1.98"
},
"dependencies" : {
"websocket-client" : "0.9.3 - 0.9.99999"
},
"repositories" : [
{
"type" : "git",
Expand Down