Skip to content

Commit 22d507d

Browse files
nshylocker
authored andcommitted
iproto: don't hang on uncancellable iproto request
Currently if there is uncancellable iproto request Tarantool shutdown will hang. Let's instead give it some time and then panic. On this way it is good to make iproto_drop_connections() fail on timeout. It is used in `box.ctl.iproto_lockdown` which is better to fail on timeout than to hang indefinitely too. In Tarantool CI which is run with TEST_BUILD set, we set the timeout to the infinity. This is on par with current fiber_shutdown() behaviour. We will not change the latter for a while because there is already several tests that count on that. Also it is currently easier to test that there is no hang than to test exit status. Part of tarantool#8423 NO_CHANGELOG=internal NO_DOC=internal
1 parent 885a3d7 commit 22d507d

File tree

5 files changed

+159
-22
lines changed

5 files changed

+159
-22
lines changed

src/box/box.cc

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
#include "tt_sort.h"
101101
#include "event.h"
102102
#include "func_adapter.h"
103+
#include "tweaks.h"
103104

104105
static char status[64] = "unconfigured";
105106

@@ -246,6 +247,22 @@ static char box_feedback_host[BOX_FEEDBACK_HOST_MAX];
246247
/** Whether sending crash info to feedback URL is enabled. */
247248
static bool box_feedback_crash_enabled;
248249

250+
#ifdef TEST_BUILD
251+
/**
252+
* Set timeout to infinity in test build because first not all CI tests treat
253+
* non zero exit code of Tarantool instance as failure currently. Also in
254+
* luatest currently it is easier to test for no hanging rather then for
255+
* Tarantool instance exit code.
256+
*/
257+
#define BOX_SHUTDOWN_TIMEOUT_DEFAULT TIMEOUT_INFINITY
258+
#else
259+
#define BOX_SHUTDOWN_TIMEOUT_DEFAULT 3.0
260+
#endif
261+
262+
/** Timeout on waiting client related fibers to finish. */
263+
static double box_shutdown_timeout = BOX_SHUTDOWN_TIMEOUT_DEFAULT;
264+
TWEAK_DOUBLE(box_shutdown_timeout);
265+
249266
static int
250267
box_run_on_recovery_state(enum box_recovery_state state)
251268
{
@@ -5976,7 +5993,10 @@ box_storage_shutdown()
59765993
if (!is_storage_initialized)
59775994
return;
59785995
is_storage_shutdown = true;
5979-
iproto_shutdown();
5996+
if (iproto_shutdown(box_shutdown_timeout) != 0) {
5997+
diag_log();
5998+
panic("cannot gracefully shutdown iproto");
5999+
}
59806000
box_watcher_shutdown();
59816001
/*
59826002
* Finish client fibers after iproto_shutdown otherwise new fibers

src/box/iproto.cc

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,25 @@ iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
134134
wpos->svp = obuf_create_svp(out);
135135
}
136136

137+
/**
138+
* Message sent when iproto thread dropped all connections that requested
139+
* to be dropped.
140+
*/
141+
struct iproto_drop_finished {
142+
/** Base structure. */
143+
struct cmsg base;
144+
/**
145+
* Generation a is a sequence number of iproto_drop_connections()
146+
* invocation.
147+
*
148+
* Generation is used to handle racy situation when previous invocation
149+
* of iproto_drop_connections() was failed and there is new invocation.
150+
* Message from previous invocation may be delivired and account
151+
* iproto thread as finished dropping connection which is not true.
152+
*/
153+
unsigned generation;
154+
};
155+
137156
struct iproto_thread {
138157
/**
139158
* Slab cache used for allocating memory for output network buffers
@@ -213,7 +232,7 @@ struct iproto_thread {
213232
* Message used to notify TX thread that all connections marked
214233
* to de dropped are dropped.
215234
*/
216-
struct cmsg drop_finished_msg;
235+
struct iproto_drop_finished drop_finished_msg;
217236
/**
218237
* If set then iproto thread shutdown is started and we should not
219238
* accept new connections.
@@ -236,6 +255,12 @@ struct iproto_thread {
236255
static struct fiber_cond drop_finished_cond;
237256
/** Count of iproto threads that are not finished connections drop yet. */
238257
static size_t drop_pending_thread_count;
258+
/**
259+
* Generation is a sequence number of dropping connection invocation.
260+
*
261+
* See also `struct iproto_drop_finished`.
262+
*/
263+
static unsigned drop_generation;
239264

240265
/**
241266
* IPROTO listen URIs. Set by box.cfg.listen.
@@ -411,6 +436,13 @@ struct iproto_cfg_msg: public cbus_call_msg
411436
* NULL if the function is called not from connection.
412437
*/
413438
struct iproto_connection *owner;
439+
/**
440+
* Generation is sequence number of dropping
441+
* connection invocation.
442+
*
443+
* See also `struct iproto_drop_finished`.
444+
*/
445+
unsigned generation;
414446
} drop_connections;
415447
};
416448
struct iproto_thread *iproto_thread;
@@ -875,6 +907,12 @@ struct iproto_connection
875907
struct rlist in_connections;
876908
/** Set if connection is being dropped. */
877909
bool is_drop_pending;
910+
/**
911+
* Generation is sequence number of dropping connection invocation.
912+
*
913+
* See also `struct iproto_drop_finished`.
914+
*/
915+
unsigned drop_generation;
878916
/**
879917
* Messaged sent to TX to cancel all inprogress requests of the
880918
* connection.
@@ -1642,21 +1680,25 @@ iproto_connection_new(struct iproto_thread *iproto_thread)
16421680
static void
16431681
tx_process_drop_finished(struct cmsg *m)
16441682
{
1645-
(void)m;
1646-
assert(drop_pending_thread_count > 0);
1647-
if (--drop_pending_thread_count == 0)
1683+
struct iproto_drop_finished *drop_finished =
1684+
(struct iproto_drop_finished *)m;
1685+
if (drop_finished->generation == drop_generation &&
1686+
--drop_pending_thread_count == 0)
16481687
fiber_cond_signal(&drop_finished_cond);
16491688
}
16501689

16511690
/** Send message to TX thread to notify that connections drop is finished. */
16521691
static void
1653-
iproto_send_drop_finished(struct iproto_thread *iproto_thread)
1692+
iproto_send_drop_finished(struct iproto_thread *iproto_thread,
1693+
unsigned generation)
16541694
{
16551695
static const struct cmsg_hop drop_finished_route[1] =
16561696
{{ tx_process_drop_finished, NULL }};
16571697

1658-
cmsg_init(&iproto_thread->drop_finished_msg, drop_finished_route);
1659-
cpipe_push(&iproto_thread->tx_pipe, &iproto_thread->drop_finished_msg);
1698+
cmsg_init(&iproto_thread->drop_finished_msg.base, drop_finished_route);
1699+
iproto_thread->drop_finished_msg.generation = generation;
1700+
cpipe_push(&iproto_thread->tx_pipe,
1701+
&iproto_thread->drop_finished_msg.base);
16601702
}
16611703

16621704
/** Recycle a connection. */
@@ -1684,7 +1726,8 @@ iproto_connection_delete(struct iproto_connection *con)
16841726

16851727
assert(iproto_thread->drop_pending_connection_count > 0);
16861728
if (--iproto_thread->drop_pending_connection_count == 0)
1687-
iproto_send_drop_finished(iproto_thread);
1729+
iproto_send_drop_finished(iproto_thread,
1730+
con->drop_generation);
16881731
}
16891732
mempool_free(&con->iproto_thread->iproto_connection_pool, con);
16901733
}
@@ -3937,6 +3980,7 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
39373980
struct iproto_connection *con;
39383981
static const struct cmsg_hop cancel_route[1] =
39393982
{{ tx_process_cancel_inprogress, NULL }};
3983+
iproto_thread->drop_pending_connection_count = 0;
39403984
rlist_foreach_entry(con, &iproto_thread->connections,
39413985
in_connections) {
39423986
/*
@@ -3954,6 +3998,8 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
39543998
*/
39553999
if (con != cfg_msg->drop_connections.owner) {
39564000
con->is_drop_pending = true;
4001+
con->drop_generation =
4002+
cfg_msg->drop_connections.generation;
39574003
iproto_thread->drop_pending_connection_count++;
39584004
}
39594005
if (con->state != IPROTO_CONNECTION_DESTROYED) {
@@ -3963,7 +4009,9 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
39634009
}
39644010
}
39654011
if (iproto_thread->drop_pending_connection_count == 0)
3966-
iproto_send_drop_finished(iproto_thread);
4012+
iproto_send_drop_finished(
4013+
iproto_thread,
4014+
cfg_msg->drop_connections.generation);
39674015
break;
39684016
}
39694017
default:
@@ -4024,27 +4072,34 @@ iproto_send_start_msg(void)
40244072
iproto_do_cfg(&iproto_threads[i], &cfg_msg);
40254073
}
40264074

4027-
void
4028-
iproto_drop_connections(void)
4075+
int
4076+
iproto_drop_connections(double timeout)
40294077
{
40304078
static struct latch latch = LATCH_INITIALIZER(latch);
40314079
latch_lock(&latch);
40324080
struct iproto_connection *owner = NULL;
40334081
struct session *session = fiber_get_session(fiber());
40344082
if (session != NULL && session->type == SESSION_TYPE_BINARY)
40354083
owner = (struct iproto_connection *)session->meta.connection;
4084+
drop_generation++;
40364085
drop_pending_thread_count = iproto_threads_count;
40374086
for (int i = 0; i < iproto_threads_count; i++) {
40384087
struct iproto_cfg_msg *cfg_msg =
40394088
(struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg));
40404089
iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_DROP_CONNECTIONS);
40414090
cfg_msg->drop_connections.owner = owner;
4091+
cfg_msg->drop_connections.generation = drop_generation;
40424092
iproto_do_cfg_async(&iproto_threads[i], cfg_msg);
40434093
}
40444094

4045-
while (drop_pending_thread_count != 0)
4046-
fiber_cond_wait(&drop_finished_cond);
4095+
double deadline = ev_monotonic_now(loop()) + timeout;
4096+
while (drop_pending_thread_count != 0) {
4097+
if (fiber_cond_wait_deadline(&drop_finished_cond,
4098+
deadline) != 0)
4099+
break;
4100+
}
40474101
latch_unlock(&latch);
4102+
return drop_pending_thread_count == 0 ? 0 : -1;
40484103
}
40494104

40504105
/** Send IPROTO_CFG_RESTART to all threads. */
@@ -4233,11 +4288,11 @@ iproto_session_send(struct session *session,
42334288
return 0;
42344289
}
42354290

4236-
void
4237-
iproto_shutdown(void)
4291+
int
4292+
iproto_shutdown(double timeout)
42384293
{
42394294
assert(iproto_is_shutting_down);
4240-
iproto_drop_connections();
4295+
return iproto_drop_connections(timeout);
42414296
}
42424297

42434298
void

src/box/iproto.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,30 @@ iproto_free(void);
193193
* Drop all current connections. That is stop IO and cancel all inprogress
194194
* requests. Return when the requests are finished and connection is freed.
195195
* Concurrent calls are serialized.
196+
*
197+
* Drop can be interrupted by cancelling fiber or on timeout. In this case
198+
* failure result code is returned. The function can be called again after
199+
* failure.
200+
*
201+
* Return:
202+
* 0 - success
203+
* -1 - failure (diag is set)
196204
*/
197-
void
198-
iproto_drop_connections(void);
205+
int
206+
iproto_drop_connections(double timeout);
199207

200208
/**
201209
* Prepare for freeing resources in iproto_free while TX event loop is still
202210
* running.
211+
*
212+
* If not finished in given timeout then failure result code is returned.
213+
*
214+
* Return:
215+
* 0 - success
216+
* -1 - failure (diag is set)
203217
*/
204-
void
205-
iproto_shutdown(void);
218+
int
219+
iproto_shutdown(double timeout);
206220

207221
#if defined(__cplusplus)
208222
} /* extern "C" */

src/box/lua/ctl.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ lbox_ctl_set_iproto_lockdown(struct lua_State *L)
164164
lua_error(L);
165165
}
166166
bool new_val = lua_toboolean(L, 1);
167-
security_set_iproto_lockdown(new_val);
167+
if (security_set_iproto_lockdown(new_val) != 0)
168+
return luaT_error(L);
168169
#else
169170
lua_pushstring(L, "box.ctl.iproto_lockdown() is available only in "
170171
"Enterprise Edition builds.");

test/box-luatest/shutdown_test.lua

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ local server = require('luatest.server')
22
local utils = require('luatest.utils')
33
local fio = require('fio')
44
local popen = require('popen')
5+
local fiber = require('fiber')
56
local t = require('luatest')
67

78
-- Luatest server currently does not allow to check process exit code.
@@ -72,3 +73,49 @@ g_crash.test_shutdown_during_snapshot_on_signal = function(cg)
7273
t.assert_equals(status.state, 'exited')
7374
t.assert_equals(status.exit_code, 0)
7475
end
76+
77+
local g = t.group()
78+
79+
g.before_each(function(cg)
80+
cg.server = server:new()
81+
cg.server:start()
82+
end)
83+
84+
g.after_each(function(cg)
85+
if cg.server ~= nil then
86+
cg.server:drop()
87+
end
88+
end)
89+
90+
local function test_no_hang_on_shutdown(server)
91+
local channel = fiber.channel()
92+
fiber.create(function()
93+
server:stop()
94+
channel:put('finished')
95+
end)
96+
t.assert(channel:get(60) ~= nil)
97+
end
98+
99+
-- Test shutdown does not hang if there is iproto request that
100+
-- cannot be cancelled.
101+
g.test_shutdown_of_hanging_iproto_request = function(cg)
102+
fiber.new(function()
103+
cg.server:exec(function()
104+
local log = require('log')
105+
local fiber = require('fiber')
106+
local tweaks = require('internal.tweaks')
107+
tweaks.box_shutdown_timeout = 1.0
108+
log.info('going to sleep for test')
109+
while true do
110+
pcall(fiber.sleep, 1000)
111+
end
112+
end)
113+
end)
114+
t.helpers.retrying({}, function()
115+
t.assert(cg.server:grep_log('going to sleep for test'))
116+
end)
117+
local log = fio.pathjoin(cg.server.workdir, cg.server.alias .. '.log')
118+
test_no_hang_on_shutdown(cg.server)
119+
t.assert(cg.server:grep_log('cannot gracefully shutdown iproto', nil,
120+
{filename = log}))
121+
end

0 commit comments

Comments
 (0)