From c970015c34c4ee15b226bd67adff900087c83db7 Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Tue, 26 Jul 2011 16:21:41 +0300 Subject: [PATCH 1/7] package.json: remove explicit websocket-client dependency (managed elsewhere) --- package.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/package.json b/package.json index 80b1587..2e27bb8 100644 --- a/package.json +++ b/package.json @@ -6,9 +6,6 @@ "engines" : { "node" : ">=0.1.98" }, - "dependencies" : { - "websocket-client" : "0.9.3 - 0.9.99999" - }, "repositories" : [ { "type" : "git", From f02c92a803b4bc9109c1d07a3262d6b487b5649c Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Fri, 16 Mar 2012 09:29:09 +0200 Subject: [PATCH 2/7] clean-up at shutdown --- lib/webworker.js | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/lib/webworker.js b/lib/webworker.js index 5011138..d250781 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -1,7 +1,7 @@ // WebWorkers implementation. // // The master and workers communite over a UNIX domain socket at -// +// // /tmp/node-webworker-.sock // // This socket is used as a full-duplex channel for exchanging messages. @@ -64,7 +64,7 @@ var Worker = function(src, opts) { // The timeout ID for killing off this worker if it is unresponsive to a // graceful shutdown request var killTimeoutID = undefined; - + // Process ID of child process running this worker // // This value persists even once the child process itself has @@ -102,7 +102,7 @@ var Worker = function(src, opts) { stream = s._req.socket; msgStream = new wwutil.MsgStream(s); - + // Process any messages waiting to be sent msgQueue.forEach(function(m) { var fd = m.pop(); @@ -156,7 +156,7 @@ var Worker = function(src, opts) { cp.addListener('exit', function(code, signal) { wwutil.debug( - 'Process ' + pid + ' for worker \'' + src + + 'Process ' + pid + ' for worker \'' + src + '\' exited with status ' + code +', signal ' + signal ); @@ -176,6 +176,13 @@ var Worker = function(src, opts) { wsSrv.close(); + fs.unlink(sockPath, function(e) { + console.log("REMOVED SOCK "+sockPath) + try { + fs.rmdirSync(path.dirname(sockPath)) + } catch(e) {} + }) + if (self.onexit) { process.nextTick(function() { self.onexit(code, signal); @@ -286,7 +293,7 @@ var Worker = function(src, opts) { } wwutil.debug( - 'Forcibily terminating worker process ' + pid + + 'Forcibily terminating worker process ' + pid + ' with SIGTERM' ); @@ -301,4 +308,7 @@ var Worker = function(src, opts) { exports.Worker = Worker; // Perform any one-time initialization -fs.mkdirSync(SOCK_DIR_PATH, 0700); +try { + fs.mkdirSync(SOCK_DIR_PATH, 0700); +} catch(e) {} + From 752052f282f8f038f3b5f14dc051f241a3ef2eac Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Fri, 16 Mar 2012 09:41:53 +0200 Subject: [PATCH 3/7] remove socket and socket folder, then call onexit --- lib/webworker.js | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/lib/webworker.js b/lib/webworker.js index d250781..cb9d27a 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -176,18 +176,19 @@ var Worker = function(src, opts) { wsSrv.close(); + // remove the socket fs.unlink(sockPath, function(e) { - console.log("REMOVED SOCK "+sockPath) try { - fs.rmdirSync(path.dirname(sockPath)) + // try removing the socket directory + fs.rmdirSync(path.dirname(sockPath)); } catch(e) {} - }) - if (self.onexit) { - process.nextTick(function() { - self.onexit(code, signal); - }); - } + if (self.onexit) { + process.nextTick(function() { + self.onexit(code, signal); + }); + } + }); }); }); From 98204c502306191025b6cc3cb261a724048ae425 Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Tue, 20 Mar 2012 11:31:33 +0200 Subject: [PATCH 4/7] ensure the socket folder is there when creating new worker (may have been cleaned up) --- lib/webworker.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/webworker.js b/lib/webworker.js index cb9d27a..2b206bb 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -92,6 +92,11 @@ var Worker = function(src, opts) { // The path to our socket var sockPath = path.join(SOCK_DIR_PATH, '' + numWorkersCreated++); + // Make sure our socket folder is in place (it may have been removed by a previous clean-up) + try { + fs.mkdirSync(SOCK_DIR_PATH, 0700); + } catch(e) {} + // Server instance for our communication socket with the child process // // Doesn't begin listening until start() is called. @@ -308,8 +313,3 @@ var Worker = function(src, opts) { }; exports.Worker = Worker; -// Perform any one-time initialization -try { - fs.mkdirSync(SOCK_DIR_PATH, 0700); -} catch(e) {} - From 9582522b50b2b4c9e0f4e1f5554fa84322d7489d Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Fri, 13 Apr 2012 16:04:01 +0300 Subject: [PATCH 5/7] Use einaros/ws instead of node-websocket-*. Support for Node 0.6. --- lib/webworker-child.js | 19 +++++------------ lib/webworker-util.js | 6 +++--- lib/webworker.js | 46 ++++++++++++++++++++++++------------------ 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/lib/webworker-child.js b/lib/webworker-child.js index 34b3c9f..ad5b497 100644 --- a/lib/webworker-child.js +++ b/lib/webworker-child.js @@ -15,20 +15,11 @@ var assert = require('assert'); var fs = require('fs'); var net = require('net'); var path = require('path'); -var script = process.binding('evals'); +var vm = require('vm'); var sys = require('sys'); var wwutil = require('./webworker-util'); -var WebSocketServer = require('./ws').WebSocketServer; -try { - var WebSocket = require('websocket-client').WebSocket; -} catch (e) { - throw new Error( - 'pgriess/node-websocket-client must be installed' - ); -} - -var writeError = process.binding('stdio').writeError; +var WebSocket = require('ws'); // Catch exceptions // @@ -112,7 +103,7 @@ var sockPath = process.argv[2]; var scriptLoc = new wwutil.WorkerLocation(process.argv[3]); // Connect to the parent process -var ws = new WebSocket('ws+unix://' + sockPath); +var ws = new WebSocket('ws+unix://' +sockPath); var ms = new wwutil.MsgStream(ws); // Once we connect successfully, set up the rest of the world @@ -132,14 +123,14 @@ ws.addListener('open', function() { var scriptObj = undefined; switch (scriptLoc.protocol) { case 'file': - scriptObj = new script.Script( + scriptObj = new vm.createScript( fs.readFileSync(scriptLoc.pathname), scriptLoc.href ); break; default: - writeError('Cannot load script from unknown protocol \'' + + console.error('Cannot load script from unknown protocol \'' + scriptLoc.protocol); process.exit(1); } diff --git a/lib/webworker-util.js b/lib/webworker-util.js index 7022cf6..f277dcc 100644 --- a/lib/webworker-util.js +++ b/lib/webworker-util.js @@ -60,10 +60,10 @@ var MsgStream = function(s) { var ms = getMsgObj(v, fd); debug('Process ' + process.pid + ' sending message: ' + sys.inspect(ms)); - s.write(JSON.stringify(ms), fd) + s.send(JSON.stringify(ms), fd) }; - s.addListener('message', function(ms) { + s.on('message', function(ms) { debug('Process ' + process.pid + ' received message: ' + ms); var mo = JSON.parse(ms); @@ -93,7 +93,7 @@ var MsgStream = function(s) { self.emit('msg', msg, fd); }); - s.addListener('fd', function(fd) { + s.on('fd', function(fd) { // Look for a message that's waiting for our arrival. If we don't // have one, enqueu the received FD for later delivery. var msg = msg_waiting_for_fd[++fds_seqno_recvd]; diff --git a/lib/webworker.js b/lib/webworker.js index 2b206bb..134828d 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -32,19 +32,12 @@ var assert = require('assert'); var child_process = require('child_process'); var fs = require('fs'); var net = require('net'); -var netBinding = process.binding('net'); +var http = require('http'); var path = require('path'); var sys = require('sys'); var wwutil = require('./webworker-util'); -var WebSocketServer = require('./ws').Server; - -try { - var WebSocket = require('websocket-client').WebSocket; -} catch (e) { - throw new Error( - 'pgriess/node-websocket-client must be installed' - ); -} +var WebSocketServer = require('ws').Server; +var inspect = require('eyes').inspector({styles: {all: 'magenta'}}); // Directory for our UNIX domain sockets var SOCK_DIR_PATH = '/tmp/node-webworker-' + process.pid; @@ -100,12 +93,14 @@ var Worker = function(src, opts) { // Server instance for our communication socket with the child process // // Doesn't begin listening until start() is called. - var wsSrv = new WebSocketServer(); + + var httpServer = http.createServer(); + var wsSrv = new WebSocketServer({ server: httpServer }); wsSrv.addListener('connection', function(s) { assert.equal(stream, undefined); assert.equal(msgStream, undefined); - stream = s._req.socket; + stream = s; msgStream = new wwutil.MsgStream(s); // Process any messages waiting to be sent @@ -125,7 +120,8 @@ var Worker = function(src, opts) { // First fires up the UNIX socket server, then spawns the child process // and away we go. var start = function() { - wsSrv.addListener('listening', function() { + httpServer.listen(sockPath); + httpServer.addListener('listening', function() { var execPath = opts.path || process.execPath || process.argv[0]; var args = [ @@ -145,9 +141,7 @@ var Worker = function(src, opts) { cp = child_process.spawn( execPath, - args, - undefined, - [0, 1, 2] + args ); // Save off the PID of the child process, as this value gets @@ -158,7 +152,12 @@ var Worker = function(src, opts) { 'Spawned process ' + pid + ' for worker \'' + src + '\': ' + execPath + ' ' + args.join(' ') ); - + cp.stdout.on('data', function(d) { + process.stdout.write(d); + }) + cp.stderr.on('data', function(d) { + process.stderr.write(d); + }) cp.addListener('exit', function(code, signal) { wwutil.debug( 'Process ' + pid + ' for worker \'' + src + @@ -172,7 +171,7 @@ var Worker = function(src, opts) { } if (stream) { - stream.destroy(); + stream.close(); } else { wwutil.debug( 'Process ' + pid + ' exited without completing handshaking' @@ -180,6 +179,7 @@ var Worker = function(src, opts) { } wsSrv.close(); + httpServer.close(); // remove the socket fs.unlink(sockPath, function(e) { @@ -194,10 +194,14 @@ var Worker = function(src, opts) { }); } }); + + if (self.onexit) { + process.nextTick(function() { + self.onexit(code, signal); + }); + } }); }); - - wsSrv.listen(sockPath); }; // The primary message handling function for the worker. @@ -313,3 +317,5 @@ var Worker = function(src, opts) { }; exports.Worker = Worker; +// Perform any one-time initialization +fs.mkdirSync(SOCK_DIR_PATH, 0700); From 6f21c828a4502e030649959d24e9fe66dfaedb6a Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Mon, 23 Apr 2012 13:06:07 +0300 Subject: [PATCH 6/7] close stream --- lib/webworker.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/webworker.js b/lib/webworker.js index 134828d..9e3d70c 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -180,6 +180,8 @@ var Worker = function(src, opts) { wsSrv.close(); httpServer.close(); + if (msgStream) + msgStream = null; // remove the socket fs.unlink(sockPath, function(e) { From cbcbea2763fae69929775acea7b35f761a9e45c4 Mon Sep 17 00:00:00 2001 From: Jaakko Manninen Date: Thu, 3 May 2012 13:27:58 +0300 Subject: [PATCH 7/7] swallow EEXISTS error on SOCK_DIR_PATH --- lib/webworker.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/webworker.js b/lib/webworker.js index 9e3d70c..b065920 100644 --- a/lib/webworker.js +++ b/lib/webworker.js @@ -320,4 +320,9 @@ var Worker = function(src, opts) { exports.Worker = Worker; // Perform any one-time initialization -fs.mkdirSync(SOCK_DIR_PATH, 0700); +try { + fs.mkdirSync(SOCK_DIR_PATH, 0700); +} catch(e) { + if (e.code && e.code != 'EEXIST') + throw e; +}