Skip to content

Commit 61da93b

Browse files
Don't trigger events when module (environment) is terminating.
1 parent 99712aa commit 61da93b

File tree

7 files changed

+117
-68
lines changed

7 files changed

+117
-68
lines changed

src/module.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,16 @@ class Module {
4848
public:
4949
explicit Module(Napi::Object exports);
5050

51+
~Module() {
52+
Terminating = true;
53+
}
54+
5155
inline class Global& Global() {
5256
return *global;
5357
}
5458

59+
bool Terminating = false;
60+
5561
/* The order of properties defines their destruction in reverse order and is
5662
very important to ensure a clean process exit. During the destruction of
5763
other objects buffers might be released, we must delete trash last. */

src/observer.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,9 @@ void Observer::Close() {
178178

179179
/* Stop all polling and release event handlers. Callling this after
180180
setting socket to null causes a pending receive promise to be
181-
resolved with undefined. */
182-
poller.Close();
181+
resolved with undefined. If the module is terminating, first cancel
182+
all callbacks (they won't work anymore). */
183+
poller.Close(module.Terminating);
183184
}
184185
}
185186

src/poller.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ class Poller {
6565

6666
/* Safely close and release all handles. This can be called before
6767
destruction to release resources early. */
68-
inline void Close() {
68+
inline void Close(bool cancel = false) {
69+
/* Cancels watched events, don't trigger. */
70+
if (cancel) events = 0;
71+
6972
/* Trigger all watched events manually, which causes any pending
7073
operation to succeed or fail immediately. */
7174
if (events) Trigger(events);

src/socket.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,9 @@ void Socket::Close() {
252252
endpoints = 0;
253253
}
254254

255-
/* Stop all polling and release event handlers. */
256-
poller.Close();
255+
/* Stop all polling and release event handlers. If the module is
256+
terminating, first cancel all callbacks (they won't work anymore). */
257+
poller.Close(module.Terminating);
257258

258259
/* Close succeeds unless socket is invalid. */
259260
auto err = zmq_close(socket);
@@ -343,8 +344,7 @@ Napi::Value Socket::Bind(const Napi::CallbackInfo& info) {
343344

344345
state = Socket::State::Blocked;
345346
auto res = Napi::Promise::Deferred::New(Env());
346-
auto run_ctx =
347-
std::make_shared<AddressContext>(info[0].As<Napi::String>().Utf8Value());
347+
auto run_ctx = std::make_shared<AddressContext>(info[0].As<Napi::String>());
348348

349349
auto status = UvQueue(Env(),
350350
[=]() {
@@ -395,8 +395,7 @@ Napi::Value Socket::Unbind(const Napi::CallbackInfo& info) {
395395

396396
state = Socket::State::Blocked;
397397
auto res = Napi::Promise::Deferred::New(Env());
398-
auto run_ctx =
399-
std::make_shared<AddressContext>(info[0].As<Napi::String>().Utf8Value());
398+
auto run_ctx = std::make_shared<AddressContext>(info[0].As<Napi::String>());
400399

401400
auto status = UvQueue(Env(),
402401
[=]() {
Lines changed: 24 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,62 @@
11
/* tslint:disable: no-unused-expression */
2+
import * as zmq from "../../src"
3+
24
import {assert} from "chai"
3-
import {spawn} from "child_process"
5+
import {createProcess} from "./helpers"
46

5-
/* This file is in JavaScript instead of TypeScript because most code is
6-
being evaluated with toString() and executed in a sub-process. */
77
describe("context process exit", function() {
88
describe("with default context", function() {
99
it("should occur when sockets are closed", async function() {
1010
this.slow(200)
11-
await ensureExit(function() {
12-
const zmq = require(".")
11+
const code = await createProcess(() => {
1312
const socket1 = new zmq.Dealer
1413
socket1.close()
1514
const socket2 = new zmq.Router
1615
socket2.close()
1716
})
17+
18+
assert.equal(code, 0)
1819
})
1920

2021
it("should occur when sockets are not closed", async function() {
2122
this.slow(200)
22-
await ensureExit(function() {
23-
const zmq = require(".")
23+
const code = await createProcess(() => {
2424
const socket1 = new zmq.Dealer
2525
const socket2 = new zmq.Router
2626
})
27+
28+
assert.equal(code, 0)
2729
})
2830

2931
it("should not occur when sockets are open and polling", async function() {
3032
this.slow(750)
31-
await ensureNoExit(function() {
32-
const zmq = require(".")
33+
const code = await createProcess(() => {
3334
const socket1 = new zmq.Dealer
3435
socket1.connect("inproc://foo")
3536
socket1.receive()
3637
})
38+
39+
assert.equal(code, -1)
3740
})
3841
})
3942

4043
describe("with custom context", function() {
4144
it("should occur when sockets are closed", async function() {
4245
this.slow(200)
43-
await ensureExit(function() {
44-
const zmq = require(".")
46+
const code = await createProcess(() => {
4547
const context = new zmq.Context
4648
const socket1 = new zmq.Dealer({context})
4749
socket1.close()
4850
const socket2 = new zmq.Router({context})
4951
socket2.close()
5052
})
53+
54+
assert.equal(code, 0)
5155
})
5256

5357
it("should occur when sockets are closed and context is gced", async function() {
5458
this.slow(200)
55-
await ensureExit(function() {
56-
const zmq = require(".")
59+
const code = await createProcess(() => {
5760
function run() {
5861
const context = new zmq.Context
5962
const socket1 = new zmq.Dealer({context})
@@ -65,68 +68,31 @@ describe("context process exit", function() {
6568
run()
6669
global.gc()
6770
})
71+
72+
assert.equal(code, 0)
6873
})
6974

7075
it("should occur when sockets are not closed", async function() {
7176
this.slow(200)
72-
await ensureExit(function() {
73-
const zmq = require(".")
77+
const code = await createProcess(() => {
7478
const context = new zmq.Context
7579
const socket1 = new zmq.Dealer({context})
7680
const socket2 = new zmq.Router({context})
7781
})
82+
83+
assert.equal(code, 0)
7884
})
7985

8086
it("should not occur when sockets are open and polling", async function() {
8187
this.slow(750)
82-
await ensureNoExit(function() {
83-
const zmq = require(".")
88+
const code = await createProcess(() => {
8489
const context = new zmq.Context
8590
const socket1 = new zmq.Dealer({context})
8691
socket1.connect("inproc://foo")
8792
socket1.receive()
8893
})
89-
})
90-
})
91-
})
92-
93-
async function ensureExit(fn: () => void): Promise<void> {
94-
return new Promise((resolve) => {
95-
const child = spawn(process.argv[0], ["--expose_gc"])
96-
child.stdin.write(`(${fn})()`)
97-
child.stdin.end()
98-
99-
child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
100-
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))
10194

102-
child.on("close", (code: number) => {
103-
assert.equal(code, 0)
104-
resolve()
95+
assert.equal(code, -1)
10596
})
106-
107-
setTimeout(() => {
108-
resolve()
109-
child.kill()
110-
}, 2000)
11197
})
112-
}
113-
114-
async function ensureNoExit(fn: () => void): Promise<void> {
115-
return new Promise((resolve, reject) => {
116-
const child = spawn(process.argv[0], ["--expose_gc"])
117-
child.stdin.write(`(${fn})()`)
118-
child.stdin.end()
119-
120-
child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
121-
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))
122-
123-
child.on("close", (code: number) => {
124-
reject(new Error(`Exit with code ${code}`))
125-
})
126-
127-
setTimeout(() => {
128-
resolve()
129-
child.kill()
130-
}, 500)
131-
})
132-
}
98+
})

test/unit/helpers.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import * as path from "path"
22
import * as semver from "semver"
3+
4+
import {spawn} from "child_process"
35
import {Worker} from "worker_threads"
46

57
import * as zmq from "../../src"
@@ -51,7 +53,7 @@ export function createWorker<T, D extends {}>(
5153
parentPort.postMessage(msg)
5254
}
5355
54-
run().then(global.gc)
56+
run()
5557
`
5658

5759
const worker = new Worker(src, {
@@ -71,3 +73,33 @@ export function createWorker<T, D extends {}>(
7173
})
7274
})
7375
}
76+
77+
export function createProcess(fn: () => void): Promise<number> {
78+
const src = `
79+
const zmq = require(${JSON.stringify(path.resolve(__dirname, "../.."))})
80+
const fn = ${fn.toString()}
81+
fn()
82+
`
83+
84+
const child = spawn(process.argv[0], ["--expose_gc"])
85+
child.stdin.write(src)
86+
child.stdin.end()
87+
88+
child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
89+
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))
90+
91+
return new Promise((resolve, reject) => {
92+
child.on("close", (code: number, signal: string) => {
93+
if (signal != null) {
94+
reject(new Error(`Child exited with ${signal}`))
95+
} else {
96+
resolve(code)
97+
}
98+
})
99+
100+
setTimeout(() => {
101+
resolve(-1)
102+
child.kill()
103+
}, 1000)
104+
})
105+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import * as zmq from "../../src"
2+
3+
import {assert} from "chai"
4+
import {createProcess, uniqAddress} from "./helpers"
5+
6+
describe("socket process exit", function() {
7+
it.skip("should occur cleanly when sending in exit hook", async function() {
8+
this.slow(200)
9+
const code = await createProcess(async () => {
10+
const sockA = new zmq.Pair
11+
const sockB = new zmq.Pair
12+
await sockA.bind("inproc://test-1")
13+
sockB.connect("inproc://test-1")
14+
15+
process.on("exit", () => {
16+
console.log("hook")
17+
sockB.receive()
18+
sockA.send("foo")
19+
})
20+
})
21+
22+
assert.equal(code, 0)
23+
})
24+
25+
it("should occur cleanly when reading events", async function() {
26+
this.slow(200)
27+
const code = await createProcess(() => {
28+
const sock = new zmq.Dealer
29+
30+
async function readEvents() {
31+
const events = []
32+
for await (const event of sock.events) {
33+
events.push(event)
34+
}
35+
}
36+
37+
readEvents()
38+
})
39+
40+
assert.equal(code, 0)
41+
})
42+
})

0 commit comments

Comments
 (0)