Skip to content

Commit bab689c

Browse files
Avoid event loop starvation.
1 parent 4f85d6f commit bab689c

File tree

12 files changed

+163
-64
lines changed

12 files changed

+163
-64
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### Unreleased
2+
3+
* Break out of busy loops automatically when the number of synchronous I/O operations moves beyond a built-in threshold. This avoids the ZeroMQ background I/O process(es) starving the Node.js event loop when it can process messages faster than the application, potentially causing decreased responsiveness and/or high memory usage.
4+
15
### v6.0.0-beta.3
26

37
* Error details have been added to the "handshake:error:protocol" and "handshake:error:auth" events.

src/incoming_msg.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Napi::Value IncomingMsg::IntoBuffer(const Napi::Env& env) {
2121
return env.Undefined();
2222
}
2323

24-
static auto constexpr zero_copy_threshold = 32;
24+
static auto constexpr zero_copy_threshold = 1 << 7;
2525

2626
auto data = reinterpret_cast<uint8_t*>(zmq_msg_data(*ref));
2727
auto length = zmq_msg_size(*ref);

src/outgoing_msg.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
namespace zmq {
88
OutgoingMsg::OutgoingMsg(Napi::Value value, Module& module) {
9-
static auto constexpr zero_copy_threshold = 32;
9+
static auto constexpr zero_copy_threshold = 1 << 7;
10+
1011
auto buffer_send = [&](uint8_t* data, size_t length) {
1112
/* Zero-copy heuristic. There's an overhead in releasing the buffer with an
1213
async call to the main thread (v8 is not threadsafe), so copying small

src/poller.h

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ class Poller {
2626
int32_t err;
2727
auto loop = UvLoop(env);
2828

29-
/* Initialize uv pollers and timers, but unreference them immediately.
30-
The poller will be referenced once the socket connects or binds.
31-
The timers are weak references and will never prevent an exit. */
32-
3329
poll->data = this;
3430
err = uv_poll_init_socket(loop, poll, fd);
3531
if (err != 0) return err;
@@ -77,20 +73,20 @@ class Poller {
7773
assert((events & UV_READABLE) == 0);
7874

7975
if (timeout > 0) {
80-
auto result = uv_timer_start(readable_timer,
76+
auto err = uv_timer_start(readable_timer,
8177
[](uv_timer_t* timer) {
8278
auto& poller = *reinterpret_cast<Poller*>(timer->data);
8379
poller.Trigger(UV_READABLE);
8480
},
8581
timeout, 0);
8682

87-
assert(result == 0);
83+
assert(err == 0);
8884
}
8985

9086
if (!events) {
9187
/* Only start polling if we were not polling already. */
92-
auto result = uv_poll_start(poll, UV_READABLE, Callback);
93-
assert(result == 0);
88+
auto err = uv_poll_start(poll, UV_READABLE, Callback);
89+
assert(err == 0);
9490
}
9591

9692
events |= UV_READABLE;
@@ -100,22 +96,22 @@ class Poller {
10096
assert((events & UV_WRITABLE) == 0);
10197

10298
if (timeout > 0) {
103-
auto result = uv_timer_start(writable_timer,
99+
auto err = uv_timer_start(writable_timer,
104100
[](uv_timer_t* timer) {
105101
auto& poller = *reinterpret_cast<Poller*>(timer->data);
106102
poller.Trigger(UV_WRITABLE);
107103
},
108104
timeout, 0);
109105

110-
assert(result == 0);
106+
assert(err == 0);
111107
}
112108

113109
/* Note: We poll for READS only! "ZMQ shall signal ANY pending
114110
events on the socket in an edge-triggered fashion by making the
115111
file descriptor become ready for READING." */
116112
if (!events) {
117-
auto result = uv_poll_start(poll, UV_READABLE, Callback);
118-
assert(result == 0);
113+
auto err = uv_poll_start(poll, UV_READABLE, Callback);
114+
assert(err == 0);
119115
}
120116

121117
events |= UV_WRITABLE;
@@ -140,19 +136,19 @@ class Poller {
140136
inline void Trigger(int32_t triggered) {
141137
events &= ~triggered;
142138
if (!events) {
143-
auto result = uv_poll_stop(poll);
144-
assert(result == 0);
139+
auto err = uv_poll_stop(poll);
140+
assert(err == 0);
145141
}
146142

147143
if (triggered & UV_READABLE) {
148-
auto result = uv_timer_stop(readable_timer);
149-
assert(result == 0);
144+
auto err = uv_timer_stop(readable_timer);
145+
assert(err == 0);
150146
static_cast<T*>(this)->ReadableCallback();
151147
}
152148

153149
if (triggered & UV_WRITABLE) {
154-
auto result = uv_timer_stop(writable_timer);
155-
assert(result == 0);
150+
auto err = uv_timer_stop(writable_timer);
151+
assert(err == 0);
156152
static_cast<T*>(this)->WritableCallback();
157153
}
158154
}

src/socket.cc

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "util/arguments.h"
99
#include "util/async_scope.h"
1010
#include "util/error.h"
11+
#include "util/uvimmediate.h"
1112
#include "util/uvloop.h"
1213
#include "util/uvwork.h"
1314

@@ -16,6 +17,10 @@
1617
#include <unordered_set>
1718

1819
namespace zmq {
20+
/* The maximum number of sync I/O operations that are allowed before the I/O
21+
methods will force the returned promise to be resolved in the next tick. */
22+
auto constexpr max_sync_operations = 1 << 9;
23+
1924
/* Ordinary static cast for all available numeric types. */
2025
template <typename T>
2126
T NumberCast(const Napi::Number& num) {
@@ -524,56 +529,61 @@ Napi::Value Socket::Send(const Napi::CallbackInfo& info) {
524529
#endif
525530

526531
if (send_timeout == 0 || HasEvents(ZMQ_POLLOUT)) {
527-
/* We can send on the socket immediately. This is a separate code
528-
path so we can avoid creating a lambda. */
529-
auto res = Napi::Promise::Deferred::New(Env());
530-
Send(res, parts);
531-
532-
/* This operation may have caused a state change, so we must update
533-
the poller state manually! */
534-
poller.Trigger();
535-
536-
return res.Promise();
537-
} else {
538-
/* Check if we are already polling for writes. If so that means
539-
two async read operations are started; which we do not allow.
540-
This is not laziness; we should not introduce additional queueing
541-
because it would break ZMQ semantics. */
542-
if (poller.PollingWritable()) {
543-
ErrnoException(Env(), EAGAIN).ThrowAsJavaScriptException();
544-
return Env().Undefined();
532+
/* We can send on the socket immediately. This is a fast path. NOTE: We
533+
must make sure to not keep returning synchronously resolved promises,
534+
or we will starve the event loop. This can happen because ZMQ uses a
535+
background I/O thread, which could mean that the Node.js process is
536+
busy sending data to the I/O thread but is no longer able to respond
537+
to other events. This operation may have caused a state change, so we
538+
must also update the poller state manually! */
539+
if (sync_operations++ < max_sync_operations) {
540+
auto res = Napi::Promise::Deferred::New(Env());
541+
Send(res, parts);
542+
poller.Trigger();
543+
return res.Promise();
545544
}
546545

547-
return poller.WritePromise(send_timeout, std::move(parts));
546+
SetImmediate(Env(), [&]() { poller.Trigger(); });
548547
}
548+
549+
/* Check if we are already polling for writes. If so that means two async
550+
read operations are started; which we do not allow. This is not laziness;
551+
we should not introduce additional queueing because it would break ZMQ
552+
semantics. */
553+
if (poller.PollingWritable()) {
554+
ErrnoException(Env(), EAGAIN).ThrowAsJavaScriptException();
555+
return Env().Undefined();
556+
}
557+
558+
return poller.WritePromise(send_timeout, std::move(parts));
549559
}
550560

551561
Napi::Value Socket::Receive(const Napi::CallbackInfo& info) {
552562
if (!ValidateArguments(info, {})) return Env().Undefined();
553563
if (!ValidateOpen()) return Env().Undefined();
554564

555565
if (receive_timeout == 0 || HasEvents(ZMQ_POLLIN)) {
556-
/* We can read from the socket immediately. This is a separate code
557-
path so we can avoid creating a lambda. */
558-
auto res = Napi::Promise::Deferred::New(Env());
559-
Receive(res);
560-
561-
/* This operation may have caused a state change, so we must update
562-
the poller state manually! */
563-
poller.Trigger();
564-
565-
return res.Promise();
566-
} else {
567-
/* Check if we are already polling for reads. Only one promise may
568-
receive the next message, so we must ensure that receive
569-
operations are in sequence. */
570-
if (poller.PollingReadable()) {
571-
ErrnoException(Env(), EAGAIN).ThrowAsJavaScriptException();
572-
return Env().Undefined();
566+
/* We can read from the socket immediately. This is a fast path.
567+
Also see the related comments in Send(). */
568+
if (sync_operations++ < max_sync_operations) {
569+
auto res = Napi::Promise::Deferred::New(Env());
570+
Receive(res);
571+
poller.Trigger();
572+
return res.Promise();
573573
}
574574

575-
return poller.ReadPromise(receive_timeout);
575+
SetImmediate(Env(), [&]() { poller.Trigger(); });
576576
}
577+
578+
/* Check if we are already polling for reads. Only one promise may receive
579+
the next message, so we must ensure that receive operations are in
580+
sequence. */
581+
if (poller.PollingReadable()) {
582+
ErrnoException(Env(), EAGAIN).ThrowAsJavaScriptException();
583+
return Env().Undefined();
584+
}
585+
586+
return poller.ReadPromise(receive_timeout);
577587
}
578588

579589
void Socket::Join(const Napi::CallbackInfo& info) {
@@ -848,11 +858,13 @@ void Socket::Initialize(Module& module, Napi::Object& exports) {
848858
}
849859

850860
void Socket::Poller::ReadableCallback() {
861+
socket.sync_operations = 0;
851862
AsyncScope scope(read_deferred.Env(), socket.async_context);
852863
socket.Receive(read_deferred);
853864
}
854865

855866
void Socket::Poller::WritableCallback() {
867+
socket.sync_operations = 0;
856868
AsyncScope scope(write_deferred.Env(), socket.async_context);
857869
socket.Send(write_deferred, write_value);
858870
write_value.Clear();
@@ -865,7 +877,7 @@ Napi::Value Socket::Poller::ReadPromise(int64_t timeout) {
865877
}
866878

867879
Napi::Value Socket::Poller::WritePromise(int64_t timeout, OutgoingMsg::Parts&& value) {
868-
write_deferred = Napi::Promise::Deferred(read_deferred.Env());
880+
write_deferred = Napi::Promise::Deferred(write_deferred.Env());
869881
write_value = std::move(value);
870882
zmq::Poller<Poller>::PollWritable(timeout);
871883
return write_deferred.Promise();

src/socket.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ class Socket : public Napi::ObjectWrap<Socket>, public Closable {
9898

9999
int64_t send_timeout = -1;
100100
int64_t receive_timeout = -1;
101+
uint32_t sync_operations = 0;
101102
uint32_t endpoints = 0;
102103

103104
State state = State::Open;

src/util/uvimmediate.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/* Copyright (c) 2017-2019 Rolf Timmermans */
2+
#pragma once
3+
4+
#include "uvhandle.h"
5+
#include "uvloop.h"
6+
7+
namespace zmq {
8+
template <typename C>
9+
class UvImmediate {
10+
UvHandle<uv_check_t> check;
11+
UvHandle<uv_idle_t> idle;
12+
C delayed_callback;
13+
14+
public:
15+
UvImmediate(uv_loop_t* loop, C&& callback) : delayed_callback(std::move(callback)) {
16+
int32_t err;
17+
18+
check->data = this;
19+
err = uv_check_init(loop, check);
20+
assert(err == 0);
21+
22+
idle->data = this;
23+
err = uv_idle_init(loop, idle);
24+
assert(err == 0);
25+
}
26+
27+
inline void Schedule() {
28+
int32_t err;
29+
30+
/* Idle handle is needed to stop the event loop from blocking in poll. */
31+
err = uv_idle_start(idle, [](uv_idle_t* idle) {});
32+
assert(err == 0);
33+
34+
err = uv_check_start(check, [](uv_check_t* check) {
35+
auto& immediate = *reinterpret_cast<UvImmediate*>(check->data);
36+
immediate.delayed_callback();
37+
delete &immediate;
38+
});
39+
40+
assert(err == 0);
41+
}
42+
};
43+
44+
template <typename C>
45+
static inline void SetImmediate(const Napi::Env& env, C callback) {
46+
auto immediate = new UvImmediate<C>(UvLoop(env), std::move(callback));
47+
return immediate->Schedule();
48+
}
49+
}

src/util/uvloop.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#pragma once
33

44
namespace zmq {
5-
inline uv_loop_t* UvLoop(Napi::Env env) {
5+
inline uv_loop_t* UvLoop(const Napi::Env& env) {
66
uv_loop_t* loop = nullptr;
77
auto status = napi_get_uv_event_loop(env, &loop);
88
assert(status == napi_ok);

src/util/uvwork.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class UvWork {
2020
work->data = this;
2121
}
2222

23-
inline int32_t Exec(uv_loop_t* loop) {
23+
inline int32_t Schedule(uv_loop_t* loop) {
2424
auto err = uv_queue_work(loop, work.get(),
2525
[](uv_work_t* req) {
2626
auto& work = *reinterpret_cast<UvWork*>(req->data);
@@ -39,9 +39,9 @@ class UvWork {
3939
};
4040

4141
template <typename E, typename C>
42-
static inline int32_t UvQueue(Napi::Env env, E execute, C complete) {
42+
static inline int32_t UvQueue(const Napi::Env& env, E execute, C complete) {
4343
auto loop = UvLoop(env);
4444
auto work = new UvWork<E, C>(std::move(execute), std::move(complete));
45-
return work->Exec(loop);
45+
return work->Schedule(loop);
4646
}
4747
}

test/bench/create-socket.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ if (zmq.cur) {
1313
}
1414

1515
if (zmq.ng) {
16-
zmq.ng.global.maxSockets = n
16+
zmq.ng.context.maxSockets = n
1717
suite.add(`create socket n=${n} zmq=ng`, Object.assign({
1818
fn: deferred => {
1919
const sockets = []

0 commit comments

Comments
 (0)