From 698daa4a70d0fe7232eb52b088cabd7c805d5127 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Thu, 27 Sep 2018 00:54:27 +0200 Subject: [PATCH 01/13] ability to define worker supervisor in pool args, register worker supervisor as poolboy_sup --- src/poolboy.erl | 35 ++++++++++++++++++++++++++++++++--- src/poolboy_sup.erl | 2 +- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index db20541..505b175 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -36,8 +36,16 @@ % Copied from gen:start_ret/0 -type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. +% Copied from supervisor:sup_ref/0 +-type sup_ref() :: + (Name :: atom()) | + {Name :: atom(), Node :: node()} | + {global, Name :: atom()} | + {via, Module :: module(), Name :: any()} | + pid(). + -record(state, { - supervisor :: undefined | pid(), + supervisor :: undefined | sup_ref(), workers :: undefined | pid_queue(), waiting :: pid_queue(), monitors :: ets:tid(), @@ -151,9 +159,28 @@ init({PoolArgs, WorkerArgs}) -> Monitors = ets:new(monitors, [private]), init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). +init([{worker_supervisor, Sup = {Scope, _Name}} | Rest], WorkerArgs, State) + when Scope =:= local orelse Scope =:= global -> + init(Rest, WorkerArgs, State#state{supervisor=Sup}); +init([{worker_supervisor, Sup = {_Name, Node}} | Rest], WorkerArgs, State) -> + (catch erlang:monitor_node(Node, true)), + init(Rest, WorkerArgs, State#state{supervisor=Sup}); +init([{worker_supervisor, Sup} | Rest], WorkerArgs, State) + when is_pid(Sup) orelse is_atom(Sup) orelse is_tuple(Sup) -> + init(Rest, WorkerArgs, State#state{supervisor=Sup}); init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> - {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), - init(Rest, WorkerArgs, State#state{supervisor = Sup}); + {ok, Sup} = + case poolboy_sup:start_link(Mod, WorkerArgs) of + {ok, _Pid} = Ok -> Ok; + {error, {already_started, Pid}} -> + MRef = erlang:monitor(process, Pid), + receive + {'DOWN', MRef, _, _, _} -> ok + after ?TIMEOUT -> ok + end, + poolboy_sup:start_link(Mod, WorkerArgs) + end, + init(Rest, WorkerArgs, State#state{supervisor=Sup}); init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> init(Rest, WorkerArgs, State#state{size = Size}); init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> @@ -275,6 +302,8 @@ handle_info({'EXIT', Pid, _Reason}, State) -> {noreply, State} end end; +handle_info({nodedown, Node}, State = #state{supervisor = {_, Node}}) -> + {stop, nodedown, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index e6485a6..cabe86f 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -6,7 +6,7 @@ -export([start_link/2, init/1]). start_link(Mod, Args) -> - supervisor:start_link(?MODULE, {Mod, Args}). + supervisor:start_link({local, ?MODULE}, ?MODULE, {Mod, Args}). init({Mod, Args}) -> {ok, {{simple_one_for_one, 0, 1}, From adac9c2ce05774ea968ae98e97611823f1ba53fb Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Thu, 27 Sep 2018 10:36:27 +0200 Subject: [PATCH 02/13] register worker supervisor under its module name --- src/poolboy_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index cabe86f..2c50b6f 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -6,7 +6,7 @@ -export([start_link/2, init/1]). start_link(Mod, Args) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, {Mod, Args}). + supervisor:start_link({local, Mod}, ?MODULE, {Mod, Args}). init({Mod, Args}) -> {ok, {{simple_one_for_one, 0, 1}, From dfe97bda55095331f3e48cebb1857a1a2221c228 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Fri, 28 Sep 2018 16:41:23 +0200 Subject: [PATCH 03/13] fix for stopping a remote supervisor --- src/poolboy.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 505b175..63ceb55 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -308,10 +308,12 @@ handle_info({nodedown, Node}, State = #state{supervisor = {_, Node}}) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State) -> +terminate(_Reason, State = #state{supervisor = Sup}) -> Workers = queue:to_list(State#state.workers), ok = lists:foreach(fun (W) -> unlink(W) end, Workers), - true = exit(State#state.supervisor, shutdown), + if is_pid(Sup) -> true = exit(Sup, shutdown); + true -> ok = gen_server:stop(Sup) + end, ok. code_change(_OldVsn, State, _Extra) -> From bb86353e18c15340c220779eecda3dbd3587f75e Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Mon, 8 Oct 2018 11:17:45 +0200 Subject: [PATCH 04/13] stop on supervisor exit --- src/poolboy.erl | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 63ceb55..aa5d72c 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -284,9 +284,10 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; -handle_info({'EXIT', Pid, _Reason}, State) -> +handle_info({'EXIT', Pid, Reason}, State) -> #state{supervisor = Sup, monitors = Monitors} = State, + Next = case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -301,19 +302,21 @@ handle_info({'EXIT', Pid, _Reason}, State) -> false -> {noreply, State} end + end, + case {Sup, erlang:node(Pid)} of + {{_, Node}, Node} -> {stop, Reason, State#state{supervisor = undefined}}; + {Pid, _} -> {stop, Reason, State#state{supervisor = undefined}}; + _ -> Next end; handle_info({nodedown, Node}, State = #state{supervisor = {_, Node}}) -> - {stop, nodedown, State}; - + {stop, nodedown, State#state{supervisor = undefined}}; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State = #state{supervisor = Sup}) -> Workers = queue:to_list(State#state.workers), ok = lists:foreach(fun (W) -> unlink(W) end, Workers), - if is_pid(Sup) -> true = exit(Sup, shutdown); - true -> ok = gen_server:stop(Sup) - end, + stop_supervisor(Sup), ok. code_change(_OldVsn, State, _Extra) -> @@ -406,3 +409,12 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> full; state_name(_State) -> overflow. + +stop_supervisor(undefined) -> ok; +stop_supervisor(Pid) when is_pid(Pid) -> + case erlang:node(Pid) of + N when N == node() -> exit(Pid, shutdown); + _ -> gen_server:stop(Pid) + end; +stop_supervisor(Tuple) when is_tuple(Tuple) -> + gen_server:stop(Tuple). From a4bc02a7504396849db85da8083c4ac9182e3c03 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Wed, 10 Oct 2018 14:41:17 +0200 Subject: [PATCH 05/13] do not try to terminate remote superevisor on nodedown --- src/poolboy.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index aa5d72c..2bd9ed4 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -313,10 +313,10 @@ handle_info({nodedown, Node}, State = #state{supervisor = {_, Node}}) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State = #state{supervisor = Sup}) -> +terminate(Reason, State = #state{supervisor = Sup}) -> Workers = queue:to_list(State#state.workers), ok = lists:foreach(fun (W) -> unlink(W) end, Workers), - stop_supervisor(Sup), + stop_supervisor(Reason, Sup), ok. code_change(_OldVsn, State, _Extra) -> @@ -410,11 +410,13 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> state_name(_State) -> overflow. -stop_supervisor(undefined) -> ok; -stop_supervisor(Pid) when is_pid(Pid) -> +stop_supervisor(_, undefined) -> ok; +stop_supervisor(Reason, Pid) when is_pid(Pid) -> case erlang:node(Pid) of - N when N == node() -> exit(Pid, shutdown); - _ -> gen_server:stop(Pid) + N when N == node() -> exit(Pid, Reason); + _ when Reason =/= nodedown -> catch gen_server:stop(Pid, Reason, ?TIMEOUT); + _ -> ok end; -stop_supervisor(Tuple) when is_tuple(Tuple) -> - gen_server:stop(Tuple). +stop_supervisor(nodedown, Tuple) when is_tuple(Tuple) -> ok; +stop_supervisor(Reason, Tuple) when is_tuple(Tuple) -> + catch gen_server:stop(Tuple, Reason, ?TIMEOUT). From b92d5b9861aabe331415cdadba289e8ebef735ae Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Fri, 13 Sep 2019 00:03:40 +0200 Subject: [PATCH 06/13] fix dialyzer warnings * Factor out supervisor and worker creation from init/3, so that state record can be initialized without the default:undefined --- src/poolboy.erl | 87 +++++++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 32 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 2bd9ed4..db1430a 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -11,6 +11,8 @@ -export_type([pool/0]). -define(TIMEOUT, 5000). +-define(DEFAULT_SIZE, 5). + -ifdef(pre17). -type pid_queue() :: queue(). @@ -45,11 +47,11 @@ pid(). -record(state, { - supervisor :: undefined | sup_ref(), - workers :: undefined | pid_queue(), + supervisor :: sup_ref(), + workers :: pid_queue(), waiting :: pid_queue(), monitors :: ets:tid(), - size = 5 :: non_neg_integer(), + size = ?DEFAULT_SIZE :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), strategy = lifo :: lifo | fifo @@ -157,43 +159,64 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). - -init([{worker_supervisor, Sup = {Scope, _Name}} | Rest], WorkerArgs, State) - when Scope =:= local orelse Scope =:= global -> - init(Rest, WorkerArgs, State#state{supervisor=Sup}); -init([{worker_supervisor, Sup = {_Name, Node}} | Rest], WorkerArgs, State) -> - (catch erlang:monitor_node(Node, true)), - init(Rest, WorkerArgs, State#state{supervisor=Sup}); -init([{worker_supervisor, Sup} | Rest], WorkerArgs, State) - when is_pid(Sup) orelse is_atom(Sup) orelse is_tuple(Sup) -> - init(Rest, WorkerArgs, State#state{supervisor=Sup}); -init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> - {ok, Sup} = - case poolboy_sup:start_link(Mod, WorkerArgs) of - {ok, _Pid} = Ok -> Ok; - {error, {already_started, Pid}} -> + Supervisor = ensure_worker_supervisor(PoolArgs, WorkerArgs), + Size = proplists:get_value(size, PoolArgs, ?DEFAULT_SIZE), + Workers = prepopulate(Size, Supervisor), + init(PoolArgs, WorkerArgs, + #state{supervisor = Supervisor, + workers = Workers, + waiting = Waiting, + monitors = Monitors}). + +ensure_worker_supervisor(PoolArgs, WorkerArgs) -> + case proplists:get_value(worker_supervisor, PoolArgs) of + undefined -> + start_supervisor( + proplists:get_value(worker_module, PoolArgs), + WorkerArgs); + Sup = {Name, Node} when Name =/= local orelse + Name =/= global -> + (catch erlang:monitor_node(Node, true)), + Sup; + Sup when is_pid(Sup) orelse + is_atom(Sup) orelse + is_tuple(Sup) -> + Sup + end. + +start_supervisor(WorkerModule, WorkerArgs) -> + start_supervisor(WorkerModule, WorkerArgs, 1). + +start_supervisor(undefined, _WorkerArgs, _Retries) -> + exit({no_worker_supervisor, {worker_module, undefined}}); +start_supervisor(WorkerModule, WorkerArgs, Retries) -> + case poolboy_sup:start_link(WorkerModule, WorkerArgs) of + {ok, NewPid} -> + NewPid; + {error, {already_started, Pid}} when Retries > 0 -> MRef = erlang:monitor(process, Pid), - receive - {'DOWN', MRef, _, _, _} -> ok + receive {'DOWN', MRef, _, _, _} -> ok after ?TIMEOUT -> ok end, - poolboy_sup:start_link(Mod, WorkerArgs) - end, - init(Rest, WorkerArgs, State#state{supervisor=Sup}); + start_supervisor(WorkerModule, WorkerArgs, Retries - 1); + {error, Error} -> + exit({no_worker_supervisor, Error}) + end. + + init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> init(Rest, WorkerArgs, State#state{size = Size}); -init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> +init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) + when is_integer(MaxOverflow) -> init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); -init([{strategy, lifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = lifo}); -init([{strategy, fifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{strategy, Strategy} | Rest], WorkerArgs, State) + when Strategy == lifo orelse + Strategy == fifo -> + init(Rest, WorkerArgs, State#state{strategy = Strategy}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); -init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> - Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. +init([], _WorkerArgs, State) -> + {ok, State}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of From 8d9d7543804ccd4cf0600aa43d89256e33946f79 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Thu, 13 Feb 2020 12:07:54 +0100 Subject: [PATCH 07/13] Add worker supervisor behaviour with a start_child callback --- src/poolboy.erl | 6 +++++- src/poolboy_worker_supervisor.erl | 6 ++++++ 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 src/poolboy_worker_supervisor.erl diff --git a/src/poolboy.erl b/src/poolboy.erl index db1430a..baf4819 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -354,7 +354,11 @@ start_pool(StartFun, PoolArgs, WorkerArgs) -> end. new_worker(Sup) -> - {ok, Pid} = supervisor:start_child(Sup, []), + {ok, Pid} = + case is_atom(Sup) andalso erlang:function_exported(Sup, start_child, 0) of + true -> Sup:start_child(); + false -> supervisor:start_child(Sup, []) + end, true = link(Pid), Pid. diff --git a/src/poolboy_worker_supervisor.erl b/src/poolboy_worker_supervisor.erl new file mode 100644 index 0000000..2243cab --- /dev/null +++ b/src/poolboy_worker_supervisor.erl @@ -0,0 +1,6 @@ +-module(poolboy_worker_supervisor). + +-callback start_child() -> {ok, Pid} | + {error, Reason} when + Pid :: pid(), + Reason :: term(). From a6c4767e6a4cd76346061ff41554628eb79b0bdf Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Mon, 17 Feb 2020 01:49:35 +0100 Subject: [PATCH 08/13] Stop supervisor given by name --- src/poolboy.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/poolboy.erl b/src/poolboy.erl index baf4819..8c39d73 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -438,6 +438,8 @@ state_name(_State) -> overflow. stop_supervisor(_, undefined) -> ok; +stop_supervisor(Reason, Atom) when is_atom(Atom) -> + stop_supervisor(Reason, whereis(Atom)); stop_supervisor(Reason, Pid) when is_pid(Pid) -> case erlang:node(Pid) of N when N == node() -> exit(Pid, Reason); From d5282c8e2b068fb060065553d66141db8d938d27 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Sun, 24 May 2020 00:15:01 +0200 Subject: [PATCH 09/13] Add worker_module to state; Refactor init --- src/poolboy.erl | 107 +++++++++++++++++++++++++++++------------------- 1 file changed, 65 insertions(+), 42 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 8c39d73..9586c90 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -12,7 +12,9 @@ -define(TIMEOUT, 5000). -define(DEFAULT_SIZE, 5). - +-define(DEFAULT_TYPE, list). +-define(DEFAULT_STRATEGY, lifo). +-define(DEFAULT_OVERFLOW, 10). -ifdef(pre17). -type pid_queue() :: queue(). @@ -48,13 +50,14 @@ -record(state, { supervisor :: sup_ref(), + worker_module :: atom(), workers :: pid_queue(), waiting :: pid_queue(), monitors :: ets:tid(), size = ?DEFAULT_SIZE :: non_neg_integer(), overflow = 0 :: non_neg_integer(), - max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + max_overflow = ?DEFAULT_OVERFLOW :: non_neg_integer(), + strategy = ?DEFAULT_STRATEGY :: lifo | fifo }). -spec checkout(Pool :: pool()) -> pid(). @@ -157,38 +160,46 @@ status(Pool) -> init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), + + WorkerModule = worker_module(PoolArgs), + WorkerSup = worker_supervisor(PoolArgs), + (undefined == WorkerModule) andalso (undefined == WorkerSup) + andalso error({badarg, "worker_module or worker_supervisor is required"}), + Supervisor = ensure_supervisor(WorkerSup, WorkerModule, WorkerArgs), + Size = pool_size(PoolArgs), + Workers = init_workers(Supervisor, WorkerModule, Size), + + MaxOverflow = max_overflow(PoolArgs), + Overflow = init_overflow(Size, MaxOverflow), + Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - Supervisor = ensure_worker_supervisor(PoolArgs, WorkerArgs), - Size = proplists:get_value(size, PoolArgs, ?DEFAULT_SIZE), - Workers = prepopulate(Size, Supervisor), - init(PoolArgs, WorkerArgs, - #state{supervisor = Supervisor, - workers = Workers, - waiting = Waiting, - monitors = Monitors}). - -ensure_worker_supervisor(PoolArgs, WorkerArgs) -> - case proplists:get_value(worker_supervisor, PoolArgs) of - undefined -> - start_supervisor( - proplists:get_value(worker_module, PoolArgs), - WorkerArgs); - Sup = {Name, Node} when Name =/= local orelse - Name =/= global -> - (catch erlang:monitor_node(Node, true)), - Sup; - Sup when is_pid(Sup) orelse - is_atom(Sup) orelse - is_tuple(Sup) -> - Sup - end. + {ok, #state{ + supervisor = Supervisor, + worker_module = WorkerModule, + workers = Workers, + waiting = Waiting, + monitors = Monitors, + size = Size, + overflow = Overflow, + max_overflow = MaxOverflow, + strategy = strategy(PoolArgs) + }}. + +ensure_supervisor(undefined, WorkerModule, WorkerArgs) -> + start_supervisor(WorkerModule, WorkerArgs); +ensure_supervisor(Sup = {Name, Node}, _, _) when Name =/= local orelse + Name =/= global -> + (catch erlang:monitor_node(Node, true)), + Sup; +ensure_supervisor(Sup, _, _) when is_pid(Sup) orelse + is_atom(Sup) orelse + is_tuple(Sup) -> + Sup. start_supervisor(WorkerModule, WorkerArgs) -> start_supervisor(WorkerModule, WorkerArgs, 1). -start_supervisor(undefined, _WorkerArgs, _Retries) -> - exit({no_worker_supervisor, {worker_module, undefined}}); start_supervisor(WorkerModule, WorkerArgs, Retries) -> case poolboy_sup:start_link(WorkerModule, WorkerArgs) of {ok, NewPid} -> @@ -203,20 +214,32 @@ start_supervisor(WorkerModule, WorkerArgs, Retries) -> exit({no_worker_supervisor, Error}) end. +init_workers(Sup, _Mod, Size) -> + prepopulate(Size, Sup). -init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size = Size}); -init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) - when is_integer(MaxOverflow) -> - init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); -init([{strategy, Strategy} | Rest], WorkerArgs, State) - when Strategy == lifo orelse - Strategy == fifo -> - init(Rest, WorkerArgs, State#state{strategy = Strategy}); -init([_ | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State); -init([], _WorkerArgs, State) -> - {ok, State}. +init_overflow(_Size, _MaxOverflow) -> + 0. + +worker_module(PoolArgs) -> + Is = is_atom(V = proplists:get_value(worker_module, PoolArgs)), + if not Is -> undefined; true -> V end. + +worker_supervisor(PoolArgs) -> + Is = is_atom(V = proplists:get_value(worker_supervisor, PoolArgs)), + if not Is -> undefined; true -> V end. + +pool_size(PoolArgs) -> + Is = is_integer(V = proplists:get_value(size, PoolArgs)), + if not Is -> ?DEFAULT_SIZE; true -> V end. + +max_overflow(PoolArgs) -> + Is = is_integer(V = proplists:get_value(max_overflow, PoolArgs)), + if not Is -> ?DEFAULT_OVERFLOW; true -> V end. + +-define(IS_STRATEGY(S), lists:member(S, [lifo, fifo])). +strategy(PoolArgs) -> + Is = ?IS_STRATEGY(V = proplists:get_value(strategy, PoolArgs)), + if not Is -> ?DEFAULT_STRATEGY; true -> V end. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of From 515febc65afb87336ec03f9d01331ee84d52d31a Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Sat, 30 May 2020 15:11:33 +0200 Subject: [PATCH 10/13] Terminate on supervisor EXIT; Use PID of supervisor --- src/poolboy.erl | 149 +++++++++++++++++++++++++++--------------------- 1 file changed, 84 insertions(+), 65 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 9586c90..7306b78 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -40,16 +40,8 @@ % Copied from gen:start_ret/0 -type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. -% Copied from supervisor:sup_ref/0 --type sup_ref() :: - (Name :: atom()) | - {Name :: atom(), Node :: node()} | - {global, Name :: atom()} | - {via, Module :: module(), Name :: any()} | - pid(). - -record(state, { - supervisor :: sup_ref(), + supervisor :: pid(), worker_module :: atom(), workers :: pid_queue(), waiting :: pid_queue(), @@ -162,10 +154,14 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), WorkerModule = worker_module(PoolArgs), - WorkerSup = worker_supervisor(PoolArgs), - (undefined == WorkerModule) andalso (undefined == WorkerSup) - andalso error({badarg, "worker_module or worker_supervisor is required"}), - Supervisor = ensure_supervisor(WorkerSup, WorkerModule, WorkerArgs), + Supervisor = + case worker_supervisor(PoolArgs) of + undefined -> + start_supervisor(WorkerModule, WorkerArgs); + Sup when is_pid(Sup) -> + true = link(Sup), + Sup + end, Size = pool_size(PoolArgs), Workers = init_workers(Supervisor, WorkerModule, Size), @@ -186,17 +182,8 @@ init({PoolArgs, WorkerArgs}) -> strategy = strategy(PoolArgs) }}. -ensure_supervisor(undefined, WorkerModule, WorkerArgs) -> - start_supervisor(WorkerModule, WorkerArgs); -ensure_supervisor(Sup = {Name, Node}, _, _) when Name =/= local orelse - Name =/= global -> - (catch erlang:monitor_node(Node, true)), - Sup; -ensure_supervisor(Sup, _, _) when is_pid(Sup) orelse - is_atom(Sup) orelse - is_tuple(Sup) -> - Sup. - +start_supervisor(undefined, _WorkerArgs) -> + error({badarg, "worker_module or worker_supervisor is required"}); start_supervisor(WorkerModule, WorkerArgs) -> start_supervisor(WorkerModule, WorkerArgs, 1). @@ -214,8 +201,8 @@ start_supervisor(WorkerModule, WorkerArgs, Retries) -> exit({no_worker_supervisor, Error}) end. -init_workers(Sup, _Mod, Size) -> - prepopulate(Size, Sup). +init_workers(Sup, Mod, Size) -> + prepopulate(Size, Sup, Mod). init_overflow(_Size, _MaxOverflow) -> 0. @@ -225,8 +212,22 @@ worker_module(PoolArgs) -> if not Is -> undefined; true -> V end. worker_supervisor(PoolArgs) -> - Is = is_atom(V = proplists:get_value(worker_supervisor, PoolArgs)), - if not Is -> undefined; true -> V end. + Is = is_pid(Res = find_pid(V = proplists:get_value(worker_supervisor, PoolArgs))), + if not Is andalso Res =/= V -> exit({not_found, V, Res}); true -> Res end. + +find_pid(undefined) -> + undefined; +find_pid(Name) when is_atom(Name) -> + find_pid({local, Name}); +find_pid({local, Name}) -> + whereis(Name); +find_pid({global, Name}) -> + find_pid({via, global, Name}); +find_pid({via, Registry, Name}) -> + Registry:whereis_name(Name); +find_pid({Name, Node}) -> + (catch erlang:monitor_node(Node, true)), + rpc:call(Node, erlang, whereis, [Name], ?TIMEOUT). pool_size(PoolArgs) -> Is = is_integer(V = proplists:get_value(size, PoolArgs)), @@ -275,6 +276,7 @@ handle_cast(_Msg, State) -> handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> #state{supervisor = Sup, + worker_module = Mod, workers = Workers, monitors = Monitors, overflow = Overflow, @@ -286,7 +288,8 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{workers = Left}}; {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> - {Pid, MRef} = new_worker(Sup, FromPid), + Pid = new_worker(Sup, Mod), + MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{overflow = Overflow + 1}}; {empty, _Left} when Block =:= false -> @@ -330,10 +333,12 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; -handle_info({'EXIT', Pid, Reason}, State) -> +handle_info({'EXIT', Pid, Reason}, State = #state{supervisor = Pid}) -> + {stop, Reason, State}; +handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, + worker_module = Mod, monitors = Monitors} = State, - Next = case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -344,18 +349,14 @@ handle_info({'EXIT', Pid, Reason}, State) -> case queue:member(Pid, State#state.workers) of true -> W = filter_worker_by_pid(Pid, State#state.workers), - {noreply, State#state{workers = queue:in(new_worker(Sup), W)}}; + {noreply, State#state{workers = queue:in(new_worker(Sup, Mod), W)}}; false -> {noreply, State} end - end, - case {Sup, erlang:node(Pid)} of - {{_, Node}, Node} -> {stop, Reason, State#state{supervisor = undefined}}; - {Pid, _} -> {stop, Reason, State#state{supervisor = undefined}}; - _ -> Next end; -handle_info({nodedown, Node}, State = #state{supervisor = {_, Node}}) -> - {stop, nodedown, State#state{supervisor = undefined}}; +handle_info({nodedown, Node}, State = #state{supervisor = Sup}) + when Node == erlang:node(Sup) -> + {stop, nodedown, State}; handle_info(_Info, State) -> {noreply, State}. @@ -376,25 +377,46 @@ start_pool(StartFun, PoolArgs, WorkerArgs) -> gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) end. -new_worker(Sup) -> +new_worker(Sup, Mod) -> + Node = erlang:node(Sup), {ok, Pid} = - case is_atom(Sup) andalso erlang:function_exported(Sup, start_child, 0) of - true -> Sup:start_child(); - false -> supervisor:start_child(Sup, []) + case rpc:pinfo(Sup, registered_name) of + {registered_name, Name} -> + case function_exported(Node, Name, start_child, 0) of + true -> rpc:call(Node, Name, start_child, []); + false -> + Args = child_args(Sup, Mod), + supervisor:start_child(Sup, Args) + end; + R when R == undefined; R == [] -> + Args = child_args(Sup, Mod), + supervisor:start_child(Sup, Args) end, true = link(Pid), Pid. -new_worker(Sup, FromPid) -> - Pid = new_worker(Sup), - Ref = erlang:monitor(process, FromPid), - {Pid, Ref}. - get_worker_with_strategy(Workers, fifo) -> queue:out(Workers); get_worker_with_strategy(Workers, lifo) -> queue:out_r(Workers). +child_args(Sup, Mod) -> + Node = erlang:node(Sup), + case supervisor:get_childspec(Sup, Mod) of + {ok, #{start := {M,F,A}}} -> + case function_exported(Node, M, F, length(A)) of + true -> [] + end; + {ok, {_Id, {M,F,A}, _R, _SD, _T, _M}} -> + case function_exported(Node, M, F, length(A)) of + true -> [] + end; + _ -> [] + end. + +function_exported(Node, Module, Name, Arity) -> + rpc:call(Node, erlang, function_exported, [Module, Name, Arity]). + dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). @@ -402,15 +424,15 @@ dismiss_worker(Sup, Pid) -> filter_worker_by_pid(Pid, Workers) -> queue:filter(fun (WPid) -> WPid =/= Pid end, Workers). -prepopulate(N, _Sup) when N < 1 -> +prepopulate(N, _Sup, _Mod) when N < 1 -> queue:new(); -prepopulate(N, Sup) -> - prepopulate(N, Sup, queue:new()). +prepopulate(N, Sup, Mod) -> + prepopulate(N, Sup, Mod, queue:new()). -prepopulate(0, _Sup, Workers) -> +prepopulate(0, _Sup, _Mod, Workers) -> Workers; -prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). +prepopulate(N, Sup, Mod, Workers) -> + prepopulate(N-1, Sup, Mod, queue:in(new_worker(Sup, Mod), Workers)). handle_checkin(Pid, State) -> #state{supervisor = Sup, @@ -432,11 +454,12 @@ handle_checkin(Pid, State) -> handle_worker_exit(Pid, State) -> #state{supervisor = Sup, + worker_module = Mod, monitors = Monitors, overflow = Overflow} = State, case queue:out(State#state.waiting) of {{value, {From, CRef, MRef}}, LeftWaiting} -> - NewWorker = new_worker(State#state.supervisor), + NewWorker = new_worker(Sup, Mod), true = ets:insert(Monitors, {NewWorker, CRef, MRef}), gen_server:reply(From, NewWorker), State#state{waiting = LeftWaiting}; @@ -444,7 +467,7 @@ handle_worker_exit(Pid, State) -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> W = filter_worker_by_pid(Pid, State#state.workers), - Workers = queue:in(new_worker(Sup), W), + Workers = queue:in(new_worker(Sup, Mod), W), State#state{workers = Workers, waiting = Empty} end. @@ -460,15 +483,11 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> state_name(_State) -> overflow. -stop_supervisor(_, undefined) -> ok; -stop_supervisor(Reason, Atom) when is_atom(Atom) -> - stop_supervisor(Reason, whereis(Atom)); stop_supervisor(Reason, Pid) when is_pid(Pid) -> case erlang:node(Pid) of - N when N == node() -> exit(Pid, Reason); - _ when Reason =/= nodedown -> catch gen_server:stop(Pid, Reason, ?TIMEOUT); + N when N == node() -> + exit(Pid, Reason); + _ when Reason =/= nodedown -> + catch gen_server:stop(Pid, Reason, ?TIMEOUT); _ -> ok - end; -stop_supervisor(nodedown, Tuple) when is_tuple(Tuple) -> ok; -stop_supervisor(Reason, Tuple) when is_tuple(Tuple) -> - catch gen_server:stop(Tuple, Reason, ?TIMEOUT). + end. From 81adda7a12260d0407487dc75dfb90124fe7fdcd Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Mon, 1 Jun 2020 14:22:08 +0200 Subject: [PATCH 11/13] Monitor external supervisor instead of linking it --- src/poolboy.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 7306b78..44a977c 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -159,7 +159,7 @@ init({PoolArgs, WorkerArgs}) -> undefined -> start_supervisor(WorkerModule, WorkerArgs); Sup when is_pid(Sup) -> - true = link(Sup), + monitor(process, Sup), Sup end, Size = pool_size(PoolArgs), @@ -323,6 +323,8 @@ handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, {reply, Reply, State}. +handle_info({'DOWN', _, process, Pid, Reason}, State = #state{supervisor = Pid}) -> + {stop, Reason, State}; handle_info({'DOWN', MRef, _, _, _}, State) -> case ets:match(State#state.monitors, {'$1', '_', MRef}) of [[Pid]] -> From ac3f228bbb1e8be59e9c4993be899c0135263574 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Wed, 19 Aug 2020 11:19:39 +0200 Subject: [PATCH 12/13] Handle badrpc when looking for Supervisor Pid, Turn undefined into noproc if Pid is not found --- src/poolboy.erl | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 44a977c..ea3dbd7 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -212,8 +212,12 @@ worker_module(PoolArgs) -> if not Is -> undefined; true -> V end. worker_supervisor(PoolArgs) -> - Is = is_pid(Res = find_pid(V = proplists:get_value(worker_supervisor, PoolArgs))), - if not Is andalso Res =/= V -> exit({not_found, V, Res}); true -> Res end. + case find_pid(V = proplists:get_value(worker_supervisor, PoolArgs)) of + Res = undefined when Res =:= V -> Res; + Res when is_pid(Res) -> Res; + Res = undefined when Res =/= V -> exit({noproc, V}); + Res -> exit({Res, V}) + end. find_pid(undefined) -> undefined; @@ -227,7 +231,13 @@ find_pid({via, Registry, Name}) -> Registry:whereis_name(Name); find_pid({Name, Node}) -> (catch erlang:monitor_node(Node, true)), - rpc:call(Node, erlang, whereis, [Name], ?TIMEOUT). + try rpc:call(Node, erlang, whereis, [Name], ?TIMEOUT) of + {badrpc, Reason} -> Reason; + Result -> Result + catch + _:Reason -> Reason + end. + pool_size(PoolArgs) -> Is = is_integer(V = proplists:get_value(size, PoolArgs)), From 6091f60ec1fc2f01e4d18027593591398afbbac6 Mon Sep 17 00:00:00 2001 From: Matyas Markovics Date: Tue, 25 Aug 2020 16:38:42 +0200 Subject: [PATCH 13/13] Unify rpc error-handling --- src/poolboy.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index ea3dbd7..1e7447e 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -231,13 +231,15 @@ find_pid({via, Registry, Name}) -> Registry:whereis_name(Name); find_pid({Name, Node}) -> (catch erlang:monitor_node(Node, true)), - try rpc:call(Node, erlang, whereis, [Name], ?TIMEOUT) of - {badrpc, Reason} -> Reason; - Result -> Result - catch - _:Reason -> Reason + try rpc_call(Node, erlang, whereis, [Name], ?TIMEOUT) + catch _:Reason -> Reason end. +rpc_call(Node, Mod, Fun, Args, Timeout) -> + case rpc:call(Node, Mod, Fun, Args, Timeout) of + {badrpc, Reason} -> exit({Reason, {Node, {Mod, Fun, Args}}}); + Result -> Result + end. pool_size(PoolArgs) -> Is = is_integer(V = proplists:get_value(size, PoolArgs)), @@ -392,10 +394,10 @@ start_pool(StartFun, PoolArgs, WorkerArgs) -> new_worker(Sup, Mod) -> Node = erlang:node(Sup), {ok, Pid} = - case rpc:pinfo(Sup, registered_name) of + case rpc_call(Node, erlang, process_info, [Sup, registered_name], ?TIMEOUT) of {registered_name, Name} -> case function_exported(Node, Name, start_child, 0) of - true -> rpc:call(Node, Name, start_child, []); + true -> rpc_call(Node, Name, start_child, [], ?TIMEOUT); false -> Args = child_args(Sup, Mod), supervisor:start_child(Sup, Args) @@ -427,7 +429,7 @@ child_args(Sup, Mod) -> end. function_exported(Node, Module, Name, Arity) -> - rpc:call(Node, erlang, function_exported, [Module, Name, Arity]). + rpc_call(Node, erlang, function_exported, [Module, Name, Arity], ?TIMEOUT). dismiss_worker(Sup, Pid) -> true = unlink(Pid),