diff --git a/lib/webworker-child.js b/lib/webworker-child.js index 34b3c9f..ad5b497 100644 --- a/lib/webworker-child.js +++ b/lib/webworker-child.js @@ -15,20 +15,11 @@ var assert = require('assert'); var fs = require('fs'); var net = require('net'); var path = require('path'); -var script = process.binding('evals'); +var vm = require('vm'); var sys = require('sys'); var wwutil = require('./webworker-util'); -var WebSocketServer = require('./ws').WebSocketServer; -try { - var WebSocket = require('websocket-client').WebSocket; -} catch (e) { - throw new Error( - 'pgriess/node-websocket-client must be installed' - ); -} - -var writeError = process.binding('stdio').writeError; +var WebSocket = require('ws'); // Catch exceptions // @@ -112,7 +103,7 @@ var sockPath = process.argv[2]; var scriptLoc = new wwutil.WorkerLocation(process.argv[3]); // Connect to the parent process -var ws = new WebSocket('ws+unix://' + sockPath); +var ws = new WebSocket('ws+unix://' +sockPath); var ms = new wwutil.MsgStream(ws); // Once we connect successfully, set up the rest of the world @@ -132,14 +123,14 @@ ws.addListener('open', function() { var scriptObj = undefined; switch (scriptLoc.protocol) { case 'file': - scriptObj = new script.Script( + scriptObj = new vm.createScript( fs.readFileSync(scriptLoc.pathname), scriptLoc.href ); break; default: - writeError('Cannot load script from unknown protocol \'' + + console.error('Cannot load script from unknown protocol \'' + scriptLoc.protocol); process.exit(1); } diff --git a/lib/webworker-util.js b/lib/webworker-util.js index 7022cf6..f277dcc 100644 --- a/lib/webworker-util.js +++ b/lib/webworker-util.js @@ -60,10 +60,10 @@ var MsgStream = function(s) { var ms = getMsgObj(v, fd); debug('Process ' + process.pid + ' sending message: ' + sys.inspect(ms)); - s.write(JSON.stringify(ms), fd) + s.send(JSON.stringify(ms), fd) }; - s.addListener('message', function(ms) { + s.on('message', function(ms) { debug('Process ' + process.pid + ' received message: ' + ms); var mo = JSON.parse(ms); @@ -93,7 +93,7 @@ var MsgStream = function(s) { self.emit('msg', msg, fd); }); - s.addListener('fd', function(fd) { + s.on('fd', function(fd) { // Look for a message that's waiting for our arrival. If we don't // have one, enqueu the received FD for later delivery. var msg = msg_waiting_for_fd[++fds_seqno_recvd]; diff --git a/lib/webworker.js b/lib/webworker.js index 5011138..b065920 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -1,7 +1,7 @@ // WebWorkers implementation. // // The master and workers communite over a UNIX domain socket at -// +// // /tmp/node-webworker-.sock // // This socket is used as a full-duplex channel for exchanging messages. @@ -32,19 +32,12 @@ var assert = require('assert'); var child_process = require('child_process'); var fs = require('fs'); var net = require('net'); -var netBinding = process.binding('net'); +var http = require('http'); var path = require('path'); var sys = require('sys'); var wwutil = require('./webworker-util'); -var WebSocketServer = require('./ws').Server; - -try { - var WebSocket = require('websocket-client').WebSocket; -} catch (e) { - throw new Error( - 'pgriess/node-websocket-client must be installed' - ); -} +var WebSocketServer = require('ws').Server; +var inspect = require('eyes').inspector({styles: {all: 'magenta'}}); // Directory for our UNIX domain sockets var SOCK_DIR_PATH = '/tmp/node-webworker-' + process.pid; @@ -64,7 +57,7 @@ var Worker = function(src, opts) { // The timeout ID for killing off this worker if it is unresponsive to a // graceful shutdown request var killTimeoutID = undefined; - + // Process ID of child process running this worker // // This value persists even once the child process itself has @@ -92,17 +85,24 @@ var Worker = function(src, opts) { // The path to our socket var sockPath = path.join(SOCK_DIR_PATH, '' + numWorkersCreated++); + // Make sure our socket folder is in place (it may have been removed by a previous clean-up) + try { + fs.mkdirSync(SOCK_DIR_PATH, 0700); + } catch(e) {} + // Server instance for our communication socket with the child process // // Doesn't begin listening until start() is called. - var wsSrv = new WebSocketServer(); + + var httpServer = http.createServer(); + var wsSrv = new WebSocketServer({ server: httpServer }); wsSrv.addListener('connection', function(s) { assert.equal(stream, undefined); assert.equal(msgStream, undefined); - stream = s._req.socket; + stream = s; msgStream = new wwutil.MsgStream(s); - + // Process any messages waiting to be sent msgQueue.forEach(function(m) { var fd = m.pop(); @@ -120,7 +120,8 @@ var Worker = function(src, opts) { // First fires up the UNIX socket server, then spawns the child process // and away we go. var start = function() { - wsSrv.addListener('listening', function() { + httpServer.listen(sockPath); + httpServer.addListener('listening', function() { var execPath = opts.path || process.execPath || process.argv[0]; var args = [ @@ -140,9 +141,7 @@ var Worker = function(src, opts) { cp = child_process.spawn( execPath, - args, - undefined, - [0, 1, 2] + args ); // Save off the PID of the child process, as this value gets @@ -153,10 +152,15 @@ var Worker = function(src, opts) { 'Spawned process ' + pid + ' for worker \'' + src + '\': ' + execPath + ' ' + args.join(' ') ); - + cp.stdout.on('data', function(d) { + process.stdout.write(d); + }) + cp.stderr.on('data', function(d) { + process.stderr.write(d); + }) cp.addListener('exit', function(code, signal) { wwutil.debug( - 'Process ' + pid + ' for worker \'' + src + + 'Process ' + pid + ' for worker \'' + src + '\' exited with status ' + code +', signal ' + signal ); @@ -167,7 +171,7 @@ var Worker = function(src, opts) { } if (stream) { - stream.destroy(); + stream.close(); } else { wwutil.debug( 'Process ' + pid + ' exited without completing handshaking' @@ -175,6 +179,23 @@ var Worker = function(src, opts) { } wsSrv.close(); + httpServer.close(); + if (msgStream) + msgStream = null; + + // remove the socket + fs.unlink(sockPath, function(e) { + try { + // try removing the socket directory + fs.rmdirSync(path.dirname(sockPath)); + } catch(e) {} + + if (self.onexit) { + process.nextTick(function() { + self.onexit(code, signal); + }); + } + }); if (self.onexit) { process.nextTick(function() { @@ -183,8 +204,6 @@ var Worker = function(src, opts) { } }); }); - - wsSrv.listen(sockPath); }; // The primary message handling function for the worker. @@ -286,7 +305,7 @@ var Worker = function(src, opts) { } wwutil.debug( - 'Forcibily terminating worker process ' + pid + + 'Forcibily terminating worker process ' + pid + ' with SIGTERM' ); @@ -301,4 +320,9 @@ var Worker = function(src, opts) { exports.Worker = Worker; // Perform any one-time initialization -fs.mkdirSync(SOCK_DIR_PATH, 0700); +try { + fs.mkdirSync(SOCK_DIR_PATH, 0700); +} catch(e) { + if (e.code && e.code != 'EEXIST') + throw e; +} diff --git a/package.json b/package.json index 80b1587..2e27bb8 100644 --- a/package.json +++ b/package.json @@ -6,9 +6,6 @@ "engines" : { "node" : ">=0.1.98" }, - "dependencies" : { - "websocket-client" : "0.9.3 - 0.9.99999" - }, "repositories" : [ { "type" : "git",