Skip to content

Commit 2092489

Browse files
Warn when messages are queued for more than 500ms on exit.
1 parent 8e76255 commit 2092489

File tree

5 files changed

+114
-19
lines changed

5 files changed

+114
-19
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
* Compatibility mode will now provide an implementation for bindSync()/unbindSync() if the 'deasync' package is available.
88

9+
* Produce a warning when messages are still queued at process exit and context termination takes more than 500ms.
10+
911
* Bump version requirement to Node.js 10.2, but reduce N-API version to 3 to support more 10.x Node.js versions.
1012

1113
### v6.0.0-beta.5

src/module.h

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
#include "util/to_string.h"
1313
#include "util/trash.h"
1414

15+
#include <chrono>
16+
#include <condition_variable>
17+
#include <cstdio>
18+
#include <thread>
19+
1520
namespace zmq {
1621
class Context;
1722
class Socket;
@@ -20,8 +25,34 @@ struct Terminator {
2025
constexpr Terminator() noexcept = default;
2126
void operator()(void* context) {
2227
assert(context != nullptr);
23-
auto err = zmq_ctx_term(context);
24-
assert(err == 0);
28+
29+
#ifdef ZMQ_BLOCKY
30+
int32_t blocky = zmq_ctx_get(context, ZMQ_BLOCKY);
31+
#else
32+
/* If the option cannot be set, don't suggest to set it. */
33+
bool blocky = false;
34+
#endif
35+
36+
using namespace std::chrono_literals;
37+
std::mutex mut;
38+
std::condition_variable cv;
39+
40+
std::thread thread([&] {
41+
auto err = zmq_ctx_term(context);
42+
assert(err == 0);
43+
cv.notify_all();
44+
});
45+
46+
std::unique_lock<std::mutex> lock(mut);
47+
if (cv.wait_for(lock, 500ms) == std::cv_status::timeout) {
48+
fprintf(stderr,
49+
"(node:%d) WARNING: Waiting for queued ZeroMQ messages to be "
50+
"delivered.%s\n",
51+
uv_os_getpid(),
52+
blocky ? " Set 'context.blocky = false' to change this behaviour." : "");
53+
}
54+
55+
thread.join();
2556
}
2657
};
2758

test/unit/context-process-exit-test.ts

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as semver from "semver"
12
import * as zmq from "../../src"
23

34
import {assert} from "chai"
@@ -7,7 +8,7 @@ describe("context process exit", function() {
78
describe("with default context", function() {
89
it("should occur when sockets are closed", async function() {
910
this.slow(200)
10-
const code = await createProcess(() => {
11+
const {code} = await createProcess(() => {
1112
const socket1 = new zmq.Dealer()
1213
socket1.close()
1314
const socket2 = new zmq.Router()
@@ -19,7 +20,7 @@ describe("context process exit", function() {
1920

2021
it("should occur when sockets are not closed", async function() {
2122
this.slow(200)
22-
const code = await createProcess(() => {
23+
const {code} = await createProcess(() => {
2324
const socket1 = new zmq.Dealer()
2425
const socket2 = new zmq.Router()
2526
})
@@ -29,20 +30,69 @@ describe("context process exit", function() {
2930

3031
it("should not occur when sockets are open and polling", async function() {
3132
this.slow(1000)
32-
const code = await createProcess(() => {
33+
const {code} = await createProcess(() => {
3334
const socket1 = new zmq.Dealer()
3435
socket1.connect("inproc://foo")
3536
socket1.receive()
3637
})
3738

3839
assert.equal(code, -1)
3940
})
41+
42+
it("should produce warning when messages are queued with blocky", async function() {
43+
this.slow(1000)
44+
const {stderr} = await createProcess(() => {
45+
zmq.context.blocky = true
46+
const socket1 = new zmq.Dealer({linger: 600})
47+
socket1.connect("tcp://127.0.0.1:6789")
48+
socket1.send(null)
49+
})
50+
51+
if (semver.satisfies(zmq.version, ">= 4.2")) {
52+
assert.match(
53+
stderr.toString(),
54+
/\(node:\d+\) WARNING: Waiting for queued ZeroMQ messages to be delivered\. Set 'context\.blocky = false' to change this behaviour\.\n/,
55+
)
56+
} else {
57+
assert.match(
58+
stderr.toString(),
59+
/\(node:\d+\) WARNING: Waiting for queued ZeroMQ messages to be delivered\.\n/,
60+
)
61+
}
62+
})
63+
64+
it("should produce warning when messages are queued without blocky", async function() {
65+
this.slow(1000)
66+
const {stderr} = await createProcess(() => {
67+
zmq.context.blocky = false
68+
const socket1 = new zmq.Dealer({linger: 600})
69+
socket1.connect("tcp://127.0.0.1:6789")
70+
socket1.send(null)
71+
})
72+
73+
assert.match(
74+
stderr.toString(),
75+
/\(node:\d+\) WARNING: Waiting for queued ZeroMQ messages to be delivered\.\n/,
76+
)
77+
})
78+
79+
it("should not produce warning when messages are queued for a short time", async function() {
80+
this.slow(1000)
81+
const {stderr} = await createProcess(() => {
82+
zmq.context.blocky = true
83+
const socket1 = new zmq.Dealer({linger: 100})
84+
socket1.connect("tcp://127.0.0.1:6789")
85+
socket1.send(null)
86+
})
87+
88+
assert.equal(stderr.toString(), "")
89+
})
4090
})
4191

4292
describe("with custom context", function() {
4393
it("should occur when sockets are closed", async function() {
4494
this.slow(200)
45-
const code = await createProcess(() => {
95+
const {code} = await createProcess(() => {
4696
const context = new zmq.Context()
4797
const socket1 = new zmq.Dealer({context})
4898
socket1.close()
@@ -55,7 +105,7 @@ describe("context process exit", function() {
55105

56106
it("should occur when sockets are closed and context is gced", async function() {
57107
this.slow(200)
58-
const code = await createProcess(() => {
108+
const {code} = await createProcess(() => {
59109
function run() {
60110
const context = new zmq.Context()
61111
const socket1 = new zmq.Dealer({context})
@@ -73,7 +123,7 @@ describe("context process exit", function() {
73123

74124
it("should occur when sockets are not closed", async function() {
75125
this.slow(200)
76-
const code = await createProcess(() => {
126+
const {code} = await createProcess(() => {
77127
const context = new zmq.Context()
78128
const socket1 = new zmq.Dealer({context})
79129
const socket2 = new zmq.Router({context})
@@ -84,7 +134,7 @@ describe("context process exit", function() {
84134

85135
it("should not occur when sockets are open and polling", async function() {
86136
this.slow(1000)
87-
const code = await createProcess(() => {
137+
const {code} = await createProcess(() => {
88138
const context = new zmq.Context()
89139
const socket1 = new zmq.Dealer({context})
90140
socket1.connect("inproc://foo")

test/unit/helpers.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,13 @@ export async function createWorker<T, D extends {}>(
8585
})
8686
}
8787

88-
export function createProcess(fn: () => void): Promise<number> {
88+
interface Result {
89+
code: number
90+
stdout: Buffer
91+
stderr: Buffer
92+
}
93+
94+
export function createProcess(fn: () => void): Promise<Result> {
8995
const src = `
9096
const zmq = require(${JSON.stringify(path.resolve(__dirname, "../.."))})
9197
const fn = ${fn.toString()}
@@ -96,20 +102,26 @@ export function createProcess(fn: () => void): Promise<number> {
96102
child.stdin.write(src)
97103
child.stdin.end()
98104

99-
child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
100-
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))
105+
let stdout: Buffer = Buffer.alloc(0)
106+
let stderr: Buffer = Buffer.alloc(0)
107+
child.stdout.on("data", (data: Buffer) => {
108+
stdout = Buffer.concat([stdout, data])
109+
})
110+
child.stderr.on("data", (data: Buffer) => {
111+
stderr = Buffer.concat([stderr, data])
112+
})
101113

102114
return new Promise((resolve, reject) => {
103115
child.on("close", (code: number, signal: string) => {
104116
if (signal != null) {
105117
reject(new Error(`Child exited with ${signal}`))
106118
} else {
107-
resolve(code)
119+
resolve({code, stdout, stderr})
108120
}
109121
})
110122

111123
setTimeout(() => {
112-
resolve(-1)
124+
resolve({code: -1, stdout, stderr})
113125
child.kill()
114126
}, 750)
115127
})

test/unit/socket-process-exit-test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ describe("socket process exit", function() {
77
/* Reported: https://github.com/nodejs/node-addon-api/issues/591 */
88
it.skip("should occur cleanly when sending in exit hook", async function() {
99
this.slow(200)
10-
const code = await createProcess(async () => {
10+
const {code} = await createProcess(async () => {
1111
const sockA = new zmq.Pair()
1212
const sockB = new zmq.Pair()
1313
await sockA.bind("inproc://test-1")
@@ -25,7 +25,7 @@ describe("socket process exit", function() {
2525

2626
it("should occur cleanly when sending on unbound socket", async function() {
2727
this.slow(200)
28-
const code = await createProcess(async () => {
28+
const {code} = await createProcess(async () => {
2929
const sock = new zmq.Publisher()
3030
await sock.send("test")
3131
})
@@ -35,7 +35,7 @@ describe("socket process exit", function() {
3535

3636
it("should not occur when sending and blocked on unbound socket", async function() {
3737
this.slow(1000)
38-
const code = await createProcess(async () => {
38+
const {code} = await createProcess(async () => {
3939
const sock = new zmq.Dealer()
4040
await sock.send("test")
4141
})
@@ -45,7 +45,7 @@ describe("socket process exit", function() {
4545

4646
it("should occur cleanly on socket close when reading events", async function() {
4747
this.slow(200)
48-
const code = await createProcess(() => {
48+
const {code} = await createProcess(() => {
4949
const sock = new zmq.Dealer()
5050

5151
async function readEvents() {
@@ -64,7 +64,7 @@ describe("socket process exit", function() {
6464

6565
it("should not occur while reading events", async function() {
6666
this.slow(1000)
67-
const code = await createProcess(async () => {
67+
const {code} = await createProcess(async () => {
6868
const sock = new zmq.Dealer()
6969

7070
const events = []

0 commit comments

Comments
 (0)