diff --git a/src/poolboy.erl b/src/poolboy.erl index db20541..1e7447e 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -11,6 +11,10 @@ -export_type([pool/0]). -define(TIMEOUT, 5000). +-define(DEFAULT_SIZE, 5). +-define(DEFAULT_TYPE, list). +-define(DEFAULT_STRATEGY, lifo). +-define(DEFAULT_OVERFLOW, 10). -ifdef(pre17). -type pid_queue() :: queue(). @@ -37,14 +41,15 @@ -type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. -record(state, { - supervisor :: undefined | pid(), - workers :: undefined | pid_queue(), + supervisor :: pid(), + worker_module :: atom(), + workers :: pid_queue(), waiting :: pid_queue(), monitors :: ets:tid(), - size = 5 :: non_neg_integer(), + size = ?DEFAULT_SIZE :: non_neg_integer(), overflow = 0 :: non_neg_integer(), - max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + max_overflow = ?DEFAULT_OVERFLOW :: non_neg_integer(), + strategy = ?DEFAULT_STRATEGY :: lifo | fifo }). -spec checkout(Pool :: pool()) -> pid(). @@ -147,26 +152,107 @@ status(Pool) -> init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), + + WorkerModule = worker_module(PoolArgs), + Supervisor = + case worker_supervisor(PoolArgs) of + undefined -> + start_supervisor(WorkerModule, WorkerArgs); + Sup when is_pid(Sup) -> + monitor(process, Sup), + Sup + end, + Size = pool_size(PoolArgs), + Workers = init_workers(Supervisor, WorkerModule, Size), + + MaxOverflow = max_overflow(PoolArgs), + Overflow = init_overflow(Size, MaxOverflow), + Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). - -init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> - {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), - init(Rest, WorkerArgs, State#state{supervisor = Sup}); -init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size = Size}); -init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> - init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); -init([{strategy, lifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = lifo}); -init([{strategy, fifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = fifo}); -init([_ | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State); -init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> - Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. + {ok, #state{ + supervisor = Supervisor, + worker_module = WorkerModule, + workers = Workers, + waiting = Waiting, + monitors = Monitors, + size = Size, + overflow = Overflow, + max_overflow = MaxOverflow, + strategy = strategy(PoolArgs) + }}. + +start_supervisor(undefined, _WorkerArgs) -> + error({badarg, "worker_module or worker_supervisor is required"}); +start_supervisor(WorkerModule, WorkerArgs) -> + start_supervisor(WorkerModule, WorkerArgs, 1). + +start_supervisor(WorkerModule, WorkerArgs, Retries) -> + case poolboy_sup:start_link(WorkerModule, WorkerArgs) of + {ok, NewPid} -> + NewPid; + {error, {already_started, Pid}} when Retries > 0 -> + MRef = erlang:monitor(process, Pid), + receive {'DOWN', MRef, _, _, _} -> ok + after ?TIMEOUT -> ok + end, + start_supervisor(WorkerModule, WorkerArgs, Retries - 1); + {error, Error} -> + exit({no_worker_supervisor, Error}) + end. + +init_workers(Sup, Mod, Size) -> + prepopulate(Size, Sup, Mod). + +init_overflow(_Size, _MaxOverflow) -> + 0. + +worker_module(PoolArgs) -> + Is = is_atom(V = proplists:get_value(worker_module, PoolArgs)), + if not Is -> undefined; true -> V end. + +worker_supervisor(PoolArgs) -> + case find_pid(V = proplists:get_value(worker_supervisor, PoolArgs)) of + Res = undefined when Res =:= V -> Res; + Res when is_pid(Res) -> Res; + Res = undefined when Res =/= V -> exit({noproc, V}); + Res -> exit({Res, V}) + end. + +find_pid(undefined) -> + undefined; +find_pid(Name) when is_atom(Name) -> + find_pid({local, Name}); +find_pid({local, Name}) -> + whereis(Name); +find_pid({global, Name}) -> + find_pid({via, global, Name}); +find_pid({via, Registry, Name}) -> + Registry:whereis_name(Name); +find_pid({Name, Node}) -> + (catch erlang:monitor_node(Node, true)), + try rpc_call(Node, erlang, whereis, [Name], ?TIMEOUT) + catch _:Reason -> Reason + end. + +rpc_call(Node, Mod, Fun, Args, Timeout) -> + case rpc:call(Node, Mod, Fun, Args, Timeout) of + {badrpc, Reason} -> exit({Reason, {Node, {Mod, Fun, Args}}}); + Result -> Result + end. + +pool_size(PoolArgs) -> + Is = is_integer(V = proplists:get_value(size, PoolArgs)), + if not Is -> ?DEFAULT_SIZE; true -> V end. + +max_overflow(PoolArgs) -> + Is = is_integer(V = proplists:get_value(max_overflow, PoolArgs)), + if not Is -> ?DEFAULT_OVERFLOW; true -> V end. + +-define(IS_STRATEGY(S), lists:member(S, [lifo, fifo])). +strategy(PoolArgs) -> + Is = ?IS_STRATEGY(V = proplists:get_value(strategy, PoolArgs)), + if not Is -> ?DEFAULT_STRATEGY; true -> V end. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of @@ -202,6 +288,7 @@ handle_cast(_Msg, State) -> handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> #state{supervisor = Sup, + worker_module = Mod, workers = Workers, monitors = Monitors, overflow = Overflow, @@ -213,7 +300,8 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{workers = Left}}; {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> - {Pid, MRef} = new_worker(Sup, FromPid), + Pid = new_worker(Sup, Mod), + MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{overflow = Overflow + 1}}; {empty, _Left} when Block =:= false -> @@ -247,6 +335,8 @@ handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, {reply, Reply, State}. +handle_info({'DOWN', _, process, Pid, Reason}, State = #state{supervisor = Pid}) -> + {stop, Reason, State}; handle_info({'DOWN', MRef, _, _, _}, State) -> case ets:match(State#state.monitors, {'$1', '_', MRef}) of [[Pid]] -> @@ -257,8 +347,11 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; +handle_info({'EXIT', Pid, Reason}, State = #state{supervisor = Pid}) -> + {stop, Reason, State}; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, + worker_module = Mod, monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> @@ -270,19 +363,21 @@ handle_info({'EXIT', Pid, _Reason}, State) -> case queue:member(Pid, State#state.workers) of true -> W = filter_worker_by_pid(Pid, State#state.workers), - {noreply, State#state{workers = queue:in(new_worker(Sup), W)}}; + {noreply, State#state{workers = queue:in(new_worker(Sup, Mod), W)}}; false -> {noreply, State} end end; - +handle_info({nodedown, Node}, State = #state{supervisor = Sup}) + when Node == erlang:node(Sup) -> + {stop, nodedown, State}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State) -> +terminate(Reason, State = #state{supervisor = Sup}) -> Workers = queue:to_list(State#state.workers), ok = lists:foreach(fun (W) -> unlink(W) end, Workers), - true = exit(State#state.supervisor, shutdown), + stop_supervisor(Reason, Sup), ok. code_change(_OldVsn, State, _Extra) -> @@ -296,21 +391,46 @@ start_pool(StartFun, PoolArgs, WorkerArgs) -> gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) end. -new_worker(Sup) -> - {ok, Pid} = supervisor:start_child(Sup, []), +new_worker(Sup, Mod) -> + Node = erlang:node(Sup), + {ok, Pid} = + case rpc_call(Node, erlang, process_info, [Sup, registered_name], ?TIMEOUT) of + {registered_name, Name} -> + case function_exported(Node, Name, start_child, 0) of + true -> rpc_call(Node, Name, start_child, [], ?TIMEOUT); + false -> + Args = child_args(Sup, Mod), + supervisor:start_child(Sup, Args) + end; + R when R == undefined; R == [] -> + Args = child_args(Sup, Mod), + supervisor:start_child(Sup, Args) + end, true = link(Pid), Pid. -new_worker(Sup, FromPid) -> - Pid = new_worker(Sup), - Ref = erlang:monitor(process, FromPid), - {Pid, Ref}. - get_worker_with_strategy(Workers, fifo) -> queue:out(Workers); get_worker_with_strategy(Workers, lifo) -> queue:out_r(Workers). +child_args(Sup, Mod) -> + Node = erlang:node(Sup), + case supervisor:get_childspec(Sup, Mod) of + {ok, #{start := {M,F,A}}} -> + case function_exported(Node, M, F, length(A)) of + true -> [] + end; + {ok, {_Id, {M,F,A}, _R, _SD, _T, _M}} -> + case function_exported(Node, M, F, length(A)) of + true -> [] + end; + _ -> [] + end. + +function_exported(Node, Module, Name, Arity) -> + rpc_call(Node, erlang, function_exported, [Module, Name, Arity], ?TIMEOUT). + dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). @@ -318,15 +438,15 @@ dismiss_worker(Sup, Pid) -> filter_worker_by_pid(Pid, Workers) -> queue:filter(fun (WPid) -> WPid =/= Pid end, Workers). -prepopulate(N, _Sup) when N < 1 -> +prepopulate(N, _Sup, _Mod) when N < 1 -> queue:new(); -prepopulate(N, Sup) -> - prepopulate(N, Sup, queue:new()). +prepopulate(N, Sup, Mod) -> + prepopulate(N, Sup, Mod, queue:new()). -prepopulate(0, _Sup, Workers) -> +prepopulate(0, _Sup, _Mod, Workers) -> Workers; -prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). +prepopulate(N, Sup, Mod, Workers) -> + prepopulate(N-1, Sup, Mod, queue:in(new_worker(Sup, Mod), Workers)). handle_checkin(Pid, State) -> #state{supervisor = Sup, @@ -348,11 +468,12 @@ handle_checkin(Pid, State) -> handle_worker_exit(Pid, State) -> #state{supervisor = Sup, + worker_module = Mod, monitors = Monitors, overflow = Overflow} = State, case queue:out(State#state.waiting) of {{value, {From, CRef, MRef}}, LeftWaiting} -> - NewWorker = new_worker(State#state.supervisor), + NewWorker = new_worker(Sup, Mod), true = ets:insert(Monitors, {NewWorker, CRef, MRef}), gen_server:reply(From, NewWorker), State#state{waiting = LeftWaiting}; @@ -360,7 +481,7 @@ handle_worker_exit(Pid, State) -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> W = filter_worker_by_pid(Pid, State#state.workers), - Workers = queue:in(new_worker(Sup), W), + Workers = queue:in(new_worker(Sup, Mod), W), State#state{workers = Workers, waiting = Empty} end. @@ -375,3 +496,12 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> full; state_name(_State) -> overflow. + +stop_supervisor(Reason, Pid) when is_pid(Pid) -> + case erlang:node(Pid) of + N when N == node() -> + exit(Pid, Reason); + _ when Reason =/= nodedown -> + catch gen_server:stop(Pid, Reason, ?TIMEOUT); + _ -> ok + end. diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index e6485a6..2c50b6f 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -6,7 +6,7 @@ -export([start_link/2, init/1]). start_link(Mod, Args) -> - supervisor:start_link(?MODULE, {Mod, Args}). + supervisor:start_link({local, Mod}, ?MODULE, {Mod, Args}). init({Mod, Args}) -> {ok, {{simple_one_for_one, 0, 1}, diff --git a/src/poolboy_worker_supervisor.erl b/src/poolboy_worker_supervisor.erl new file mode 100644 index 0000000..2243cab --- /dev/null +++ b/src/poolboy_worker_supervisor.erl @@ -0,0 +1,6 @@ +-module(poolboy_worker_supervisor). + +-callback start_child() -> {ok, Pid} | + {error, Reason} when + Pid :: pid(), + Reason :: term().