From 599fc94c1b6b8d281791e991df1d62a04bf60edd Mon Sep 17 00:00:00 2001 From: David Leach Date: Thu, 19 Nov 2015 21:23:49 +1300 Subject: [PATCH 01/12] Add ability to remove overflow workers after a delay When workers are expensive to start and transactions are quick killing all workers that are checked in when in overflow is very expensive. This change allows delaying the termination of overflow workers when there is peak load and alleviates worker churn. --- README.md | 39 ++++++++++++++--- src/poolboy.erl | 97 +++++++++++++++++++++++++++++++++++++----- test/poolboy_tests.erl | 37 ++++++++++++++++ 3 files changed, 155 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index e603768..563c288 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +18,16 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished, the caller is gone, the call has exited or +the timeout set has been reached +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,14 +159,29 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: the module that represents the workers - mandatory +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be placed first or last in the line of available workers. Default is `lifo`. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out ## Authors diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..fd7d982 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -32,11 +32,13 @@ supervisor :: pid(), workers :: [pid()], waiting :: pid_queue(), + workers_to_reap :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_ttl = 0 :: non_neg_integer() }). -spec checkout(Pool :: pool()) -> pid(). @@ -126,7 +128,9 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + WorkersToReap = ets:new(workers_to_reap, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors, + workers_to_reap = WorkersToReap}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), @@ -139,6 +143,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, Milliseconds} | Rest], WorkerArgs, State) when is_integer(Milliseconds) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = Milliseconds}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -182,8 +188,21 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> workers = Workers, monitors = Monitors, overflow = Overflow, - max_overflow = MaxOverflow} = State, + max_overflow = MaxOverflow, + workers_to_reap = WorkersToReap} = State, case Workers of + [Pid | Left] when State#state.overflow_ttl > 0 -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + ok = case ets:lookup(WorkersToReap, Pid) of + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + true = ets:delete(State#state.workers_to_reap, Pid), + ok; + [] -> + ok + end, + {reply, Pid, State#state{workers = Left}}; [Pid | Left] -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -204,8 +223,9 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -235,7 +255,10 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, - monitors = Monitors} = State, + monitors = Monitors, + workers_to_reap = WorkersToReap, + workers = Workers} = State, + true = ets:delete(WorkersToReap, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -243,15 +266,28 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case lists:member(Pid, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + W = lists:filter(fun (P) -> P =/= Pid end, Workers), {noreply, State#state{workers = [new_worker(Sup) | W]}}; false -> {noreply, State} end end; - +handle_info({reap_worker, Pid}, State)-> + #state{monitors = Monitors, + workers_to_reap = WorkersToReap} = State, + true = ets:delete(WorkersToReap, Pid), + case ets:lookup(Monitors, Pid) of + [{Pid, _, MRef}] -> + true = erlang:demonitor(MRef), + true = ets:delete(Monitors, Pid), + NewState = purge_worker(Pid, State), + {noreply, NewState}; + [] -> + NewState = purge_worker(Pid, State), + {noreply, NewState} + end; handle_info(_Info, State) -> {noreply, State}. @@ -285,6 +321,19 @@ dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). +purge_worker(Pid, State) -> + #state{supervisor = Sup, + workers = Workers, + overflow = Overflow} = State, + case Overflow > 0 of + true -> + W = lists:filter(fun (P) -> P =/= Pid end, Workers), + ok = dismiss_worker(Sup, Pid), + State#state{workers = W, overflow = State#state.overflow -1}; + false -> + State + end. + prepopulate(N, _Sup) when N < 1 -> []; prepopulate(N, Sup) -> @@ -300,12 +349,32 @@ handle_checkin(Pid, State) -> waiting = Waiting, monitors = Monitors, overflow = Overflow, - strategy = Strategy} = State, + strategy = Strategy, + overflow_ttl = OverflowTtl, + workers_to_reap = WorkersToReap} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + case ets:lookup(WorkersToReap, Pid) of + [] -> + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}), + NewOverflow = Overflow; + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + NewOverflow = Overflow -1, + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}) + end, + true = ets:insert(WorkersToReap, {Pid, Tref}), + Workers = case Strategy of + lifo -> [Pid | State#state.workers]; + fifo -> State#state.workers ++ [Pid] + end, + State#state{workers = Workers, waiting = Empty, overflow = NewOverflow}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; @@ -343,7 +412,13 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; +state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> + #state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State, + NumberOfWorkers = length(Workers), + case MaxOverflow == Overflow of + true when NumberOfWorkers > 0 -> ready; + true -> full; + false -> overflow + end; state_name(_State) -> overflow. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..b41c6f8 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -42,6 +42,9 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl behaves as expected">>, + fun pool_overflow_ttl_workers/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -386,6 +389,34 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +pool_overflow_ttl_workers() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + poolboy:checkin(Pid, Worker), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + Worker2 = poolboy:checkout(Pid), + % Ensure checked in worker is in fact being reused + ?assertEqual(Worker, Worker2), + poolboy:checkin(Pid, Worker1), + timer:sleep(500), + Worker3 = poolboy:checkout(Pid), + ?assertEqual(Worker1, Worker3), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + timer:sleep(550), + % Test overflow worker is reaped in the correct time period + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -546,5 +577,11 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). From 7190178be986250bd1230dae1adbc7af55ec859c Mon Sep 17 00:00:00 2001 From: David Leach Date: Thu, 19 Nov 2015 21:23:49 +1300 Subject: [PATCH 02/12] Add ability to remove overflow workers after a delay When workers are expensive to start and transactions are quick killing all workers that are checked in when in overflow is very expensive. This change allows delaying the termination of overflow workers when there is peak load and alleviates worker churn. --- README.md | 38 ++++++++++++++--- src/poolboy.erl | 97 +++++++++++++++++++++++++++++++++++++----- test/poolboy_tests.erl | 37 ++++++++++++++++ 3 files changed, 154 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index e603768..2e5388e 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,8 @@ Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +18,15 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished. +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,14 +158,29 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: the module that represents the workers - mandatory +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be placed first or last in the line of available workers. Default is `lifo`. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out ## Authors diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..fd7d982 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -32,11 +32,13 @@ supervisor :: pid(), workers :: [pid()], waiting :: pid_queue(), + workers_to_reap :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_ttl = 0 :: non_neg_integer() }). -spec checkout(Pool :: pool()) -> pid(). @@ -126,7 +128,9 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + WorkersToReap = ets:new(workers_to_reap, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors, + workers_to_reap = WorkersToReap}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), @@ -139,6 +143,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, Milliseconds} | Rest], WorkerArgs, State) when is_integer(Milliseconds) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = Milliseconds}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -182,8 +188,21 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> workers = Workers, monitors = Monitors, overflow = Overflow, - max_overflow = MaxOverflow} = State, + max_overflow = MaxOverflow, + workers_to_reap = WorkersToReap} = State, case Workers of + [Pid | Left] when State#state.overflow_ttl > 0 -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + ok = case ets:lookup(WorkersToReap, Pid) of + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + true = ets:delete(State#state.workers_to_reap, Pid), + ok; + [] -> + ok + end, + {reply, Pid, State#state{workers = Left}}; [Pid | Left] -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -204,8 +223,9 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -235,7 +255,10 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, - monitors = Monitors} = State, + monitors = Monitors, + workers_to_reap = WorkersToReap, + workers = Workers} = State, + true = ets:delete(WorkersToReap, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), @@ -243,15 +266,28 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case lists:member(Pid, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + W = lists:filter(fun (P) -> P =/= Pid end, Workers), {noreply, State#state{workers = [new_worker(Sup) | W]}}; false -> {noreply, State} end end; - +handle_info({reap_worker, Pid}, State)-> + #state{monitors = Monitors, + workers_to_reap = WorkersToReap} = State, + true = ets:delete(WorkersToReap, Pid), + case ets:lookup(Monitors, Pid) of + [{Pid, _, MRef}] -> + true = erlang:demonitor(MRef), + true = ets:delete(Monitors, Pid), + NewState = purge_worker(Pid, State), + {noreply, NewState}; + [] -> + NewState = purge_worker(Pid, State), + {noreply, NewState} + end; handle_info(_Info, State) -> {noreply, State}. @@ -285,6 +321,19 @@ dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). +purge_worker(Pid, State) -> + #state{supervisor = Sup, + workers = Workers, + overflow = Overflow} = State, + case Overflow > 0 of + true -> + W = lists:filter(fun (P) -> P =/= Pid end, Workers), + ok = dismiss_worker(Sup, Pid), + State#state{workers = W, overflow = State#state.overflow -1}; + false -> + State + end. + prepopulate(N, _Sup) when N < 1 -> []; prepopulate(N, Sup) -> @@ -300,12 +349,32 @@ handle_checkin(Pid, State) -> waiting = Waiting, monitors = Monitors, overflow = Overflow, - strategy = Strategy} = State, + strategy = Strategy, + overflow_ttl = OverflowTtl, + workers_to_reap = WorkersToReap} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + case ets:lookup(WorkersToReap, Pid) of + [] -> + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}), + NewOverflow = Overflow; + [{Pid, Tref}] -> + {ok, cancel} = timer:cancel(Tref), + NewOverflow = Overflow -1, + {ok, Tref} = + timer:send_after(OverflowTtl, self(), {reap_worker, Pid}) + end, + true = ets:insert(WorkersToReap, {Pid, Tref}), + Workers = case Strategy of + lifo -> [Pid | State#state.workers]; + fifo -> State#state.workers ++ [Pid] + end, + State#state{workers = Workers, waiting = Empty, overflow = NewOverflow}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; @@ -343,7 +412,13 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; +state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> + #state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State, + NumberOfWorkers = length(Workers), + case MaxOverflow == Overflow of + true when NumberOfWorkers > 0 -> ready; + true -> full; + false -> overflow + end; state_name(_State) -> overflow. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..b41c6f8 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -42,6 +42,9 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl behaves as expected">>, + fun pool_overflow_ttl_workers/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -386,6 +389,34 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +pool_overflow_ttl_workers() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + poolboy:checkin(Pid, Worker), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + Worker2 = poolboy:checkout(Pid), + % Ensure checked in worker is in fact being reused + ?assertEqual(Worker, Worker2), + poolboy:checkin(Pid, Worker1), + timer:sleep(500), + Worker3 = poolboy:checkout(Pid), + ?assertEqual(Worker1, Worker3), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + % Test overflow worker is returned to list of available workers + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + timer:sleep(550), + % Test overflow worker is reaped in the correct time period + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -546,5 +577,11 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). From eb35c27e1262b1d5615c2a7d74916478dfdaee34 Mon Sep 17 00:00:00 2001 From: David Leach Date: Mon, 18 Jan 2016 22:27:15 +1300 Subject: [PATCH 03/12] Removing debug config --- src/poolboy.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index fe6f427..39749c7 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -1,5 +1,4 @@ %% Poolboy - A hunky Erlang worker pool factory --include_lib("eunit/include/eunit.hrl"). -module(poolboy). -behaviour(gen_server). @@ -248,10 +247,8 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, monitors = Monitors, - workers_to_reap = WorkersToReap, workers = Workers} = State, - - true = ets:delete(WorkersToReap, Pid), + ok = cancel_worker_reap(State, Pid), case ets:lookup(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), From 8f8f5fe67802acf0c8f1e80524b49d270cdbd12c Mon Sep 17 00:00:00 2001 From: Tihon Date: Tue, 7 Jun 2016 17:54:48 +0300 Subject: [PATCH 04/12] change overflow workers rip logics --- .gitignore | 3 + src/poolboy.erl | 145 +++++++++++++++++------------------------ test/poolboy_tests.erl | 4 +- 3 files changed, 64 insertions(+), 88 deletions(-) diff --git a/.gitignore b/.gitignore index 02ebb9e..bad35ac 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +/rebar.lock +/poolboy.iml +/.idea .eunit .rebar _build diff --git a/src/poolboy.erl b/src/poolboy.erl index 39749c7..da6febd 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -10,6 +10,7 @@ -export_type([pool/0]). -define(TIMEOUT, 5000). +-define(RIP_PERIOD, 1000). %1 sec period. If overflow_ttl is less, than a second - rip_period will be same as overflow_ttl -ifdef(pre17). -type pid_queue() :: queue(). @@ -31,13 +32,12 @@ supervisor :: pid(), workers :: [pid()], waiting :: pid_queue(), - workers_to_reap :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), strategy = lifo :: lifo | fifo, - overflow_ttl = 0 :: non_neg_integer() + overflow_ttl = 0 :: non_neg_integer() %microseconds }). -spec checkout(Pool :: pool()) -> pid(). @@ -127,9 +127,7 @@ init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - WorkersToReap = ets:new(workers_to_reap, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors, - workers_to_reap = WorkersToReap}). + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), @@ -143,7 +141,9 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) -> - init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl}); + TimerTTL = min(OverflowTtl, ?RIP_PERIOD), + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}), + init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl * 1000}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> @@ -151,10 +151,9 @@ init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> {ok, State#state{workers = Workers}}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> - case ets:lookup(Monitors, Pid) of + case ets:take(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), - true = ets:delete(Monitors, Pid), NewState = handle_checkin(Pid, State), {noreply, NewState}; [] -> @@ -189,18 +188,15 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> overflow = Overflow, max_overflow = MaxOverflow} = State, case Workers of - [Pid | Left] when State#state.overflow_ttl > 0 -> - MRef = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), - ok = cancel_worker_reap(State, Pid), + [{Pid, _} | Left] when State#state.overflow_ttl > 0 -> + add_worker_execute(Pid, CRef, FromPid, Monitors), {reply, Pid, State#state{workers = Left}}; - [Pid | Left] -> - MRef = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + [{Pid, _} | Left] -> + add_worker_execute(Pid, CRef, FromPid, Monitors), {reply, Pid, State#state{workers = Left}}; [] when MaxOverflow > 0, Overflow < MaxOverflow -> - {Pid, MRef} = new_worker(Sup, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + Pid = new_worker(Sup), + add_worker_execute(Pid, CRef, FromPid, Monitors), {reply, Pid, State#state{overflow = Overflow + 1}}; [] when Block =:= false -> {reply, full, State}; @@ -217,9 +213,9 @@ handle_call(status, _From, State) -> CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; -handle_call(get_avail_workers, _From, State) -> - Workers = State#state.workers, - {reply, Workers, State}; +handle_call(get_avail_workers, _From, State = #state{workers = Workers}) -> + Pids = lists:map(fun({Pid, _TS}) -> Pid end, Workers), + {reply, Pids, State}; handle_call(get_all_workers, _From, State) -> Sup = State#state.supervisor, WorkerList = supervisor:which_children(Sup), @@ -248,42 +244,35 @@ handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, monitors = Monitors, workers = Workers} = State, - ok = cancel_worker_reap(State, Pid), - case ets:lookup(Monitors, Pid) of + case ets:take(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), - true = ets:delete(Monitors, Pid), NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, Workers) of + case lists:keymember(Pid, 1, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, Workers), - {noreply, State#state{workers = [new_worker(Sup) | W]}}; + W = lists:keydelete(Pid, 1, Workers), + {noreply, State#state{workers = [{new_worker(Sup), os:timestamp()} | W]}}; false -> {noreply, State} end end; -handle_info({reap_worker, Pid}, State)-> - #state{monitors = Monitors, - workers_to_reap = WorkersToReap} = State, - true = ets:delete(WorkersToReap, Pid), - case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> - true = erlang:demonitor(MRef), - true = ets:delete(Monitors, Pid), - NewState = purge_worker(Pid, State), - {noreply, NewState}; - [] -> - NewState = purge_worker(Pid, State), - {noreply, NewState} - end; +handle_info({rip_workers, TimerTTL}, State) -> + #state{overflow = Overflow, + overflow_ttl = OverTTL, + workers = Workers, + supervisor = Sup} = State, + Now = os:timestamp(), + {UOverflow, UWorkers} = do_reap_workers(Workers, Now, OverTTL, Overflow, Sup, []), + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}), + {noreply, State#state{overflow = UOverflow, workers = UWorkers}}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State) -> - ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), - true = exit(State#state.supervisor, shutdown), +terminate(_Reason, #state{workers = Workers, supervisor = Sup}) -> + ok = lists:foreach(fun ({W, _}) -> unlink(W) end, Workers), + true = exit(Sup, shutdown), ok. code_change(_OldVsn, State, _Extra) -> @@ -302,38 +291,10 @@ new_worker(Sup) -> true = link(Pid), Pid. -new_worker(Sup, FromPid) -> - Pid = new_worker(Sup), - Ref = erlang:monitor(process, FromPid), - {Pid, Ref}. - dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). -cancel_worker_reap(State, Pid) -> - case ets:lookup(State#state.workers_to_reap, Pid) of - [{Pid, Tref}] -> - erlang:cancel_timer(Tref), - true = ets:delete(State#state.workers_to_reap, Pid), - ok; - [] -> - ok - end. - -purge_worker(Pid, State) -> - #state{supervisor = Sup, - workers = Workers, - overflow = Overflow} = State, - case Overflow > 0 of - true -> - W = lists:filter(fun (P) -> P =/= Pid end, Workers), - ok = dismiss_worker(Sup, Pid), - State#state{workers = W, overflow = Overflow -1}; - false -> - State - end. - prepopulate(N, _Sup) when N < 1 -> []; prepopulate(N, Sup) -> @@ -342,7 +303,7 @@ prepopulate(N, Sup) -> prepopulate(0, _Sup, Workers) -> Workers; prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). + prepopulate(N-1, Sup, [{new_worker(Sup), os:timestamp()} | Workers]). handle_checkin(Pid, State) -> #state{supervisor = Sup, @@ -350,30 +311,22 @@ handle_checkin(Pid, State) -> monitors = Monitors, overflow = Overflow, strategy = Strategy, - overflow_ttl = OverflowTtl, - workers_to_reap = WorkersToReap} = State, + overflow_ttl = OverflowTtl} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> - Tref = - erlang:send_after(OverflowTtl, self(), {reap_worker, Pid}), - true = ets:insert(WorkersToReap, {Pid, Tref}), - Workers = case Strategy of - lifo -> [Pid | State#state.workers]; - fifo -> State#state.workers ++ [Pid] - end, + LastUseTime = os:timestamp(), + Workers = return_worker(Strategy, {Pid, LastUseTime}, State#state.workers), State#state{workers = Workers, waiting = Empty}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = case Strategy of - lifo -> [Pid | State#state.workers]; - fifo -> State#state.workers ++ [Pid] - end, + LastUseTime = os:timestamp(), + Workers = return_worker(Strategy, {Pid, LastUseTime}, State#state.workers), State#state{workers = Workers, waiting = Empty, overflow = 0} end. @@ -391,7 +344,7 @@ handle_worker_exit(Pid, State) -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> Workers = - [new_worker(Sup) + [{new_worker(Sup), os:timestamp()} | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], State#state{workers = Workers, waiting = Empty} end. @@ -413,3 +366,23 @@ state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> end; state_name(_State) -> overflow. + +add_worker_execute(WorkerPid, CRef, FromPid, Monitors) -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {WorkerPid, CRef, MRef}). + +return_worker(lifo, Pid, Workers) -> [Pid | Workers]; +return_worker(fifo, Pid, Workers) -> Workers ++ [Pid]. + +do_reap_workers(UnScanned, _, _, 0, _, Workers) -> + {0, lists:reverse(Workers) ++ UnScanned}; +do_reap_workers([], _, _, Overflow, _, Workers) -> + {Overflow, lists:reverse(Workers)}; +do_reap_workers([Worker = {Pid, LTime} | Rest], Now, TTL, Overflow, Sup, Workers) -> + case timer:now_diff(Now, LTime) > TTL of + true -> + ok = dismiss_worker(Sup, Pid), + do_reap_workers(Rest, Now, TTL, Overflow - 1, Sup, Workers); + false -> + do_reap_workers(Rest, Now, TTL, Overflow, Sup, [Worker | Workers]) + end. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 6314472..a3e4270 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -390,7 +390,7 @@ pool_full_nonblocking() -> ok = pool_call(Pid, stop). pool_overflow_ttl_workers() -> - {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 1000), + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 800), Worker = poolboy:checkout(Pid), Worker1 = poolboy:checkout(Pid), % Test pool behaves normally when full @@ -428,7 +428,7 @@ pool_overflow_ttl_workers() -> ?assertEqual({ready, 2, 1, 0}, poolboy:status(Pid)), ?assertEqual(2, length(pool_call(Pid, get_all_workers))), % Test overflow worker is reaped in the correct time period - timer:sleep(850), + timer:sleep(1500), % Test overflow worker is reaped in the correct time period ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), % Test worker death behaviour From b8f8bf018bb41f72c4b5eea496969ce1b40dbc69 Mon Sep 17 00:00:00 2001 From: tihon Date: Wed, 8 Jun 2016 12:53:17 +0300 Subject: [PATCH 05/12] made overflow_check_period configurable, update readme --- README.md | 2 ++ src/poolboy.erl | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2e5388e..25820b0 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,8 @@ code_change(_OldVsn, State, _Extra) -> placed first or last in the line of available workers. Default is `lifo`. - `overflow_ttl`: time in milliseconds you want to wait before removing overflow workers. Useful when it's expensive to start workers. Default is 0. +- `overflow_check_period`: time in milliseconds for checking overflow workers to rip. + Default is min(1 sec, overflow_ttl). Cheking job will not be started, if overflow_ttl is 0. ## Pool Status Returns : {Status, Workers, Overflow, InUse} diff --git a/src/poolboy.erl b/src/poolboy.erl index da6febd..7fbf967 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -10,7 +10,7 @@ -export_type([pool/0]). -define(TIMEOUT, 5000). --define(RIP_PERIOD, 1000). %1 sec period. If overflow_ttl is less, than a second - rip_period will be same as overflow_ttl +-define(CHECK_OVERFLOW_DEFAULT_PERIOD, 1000000). %microseconds -ifdef(pre17). -type pid_queue() :: queue(). @@ -37,6 +37,7 @@ overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), strategy = lifo :: lifo | fifo, + overflow_check_period = ?CHECK_OVERFLOW_DEFAULT_PERIOD :: non_neg_integer(), %milliseconds overflow_ttl = 0 :: non_neg_integer() %microseconds }). @@ -141,13 +142,14 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) -> - TimerTTL = min(OverflowTtl, ?RIP_PERIOD), - erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}), init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl * 1000}); +init([{overflow_check_period, OverflowCheckPeriod} | Rest], WorkerArgs, State) when is_integer(OverflowCheckPeriod) -> + init(Rest, WorkerArgs, State#state{overflow_check_period = OverflowCheckPeriod * 1000}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> Workers = prepopulate(Size, Sup), + start_timer(State), {ok, State#state{workers = Workers}}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> @@ -386,3 +388,12 @@ do_reap_workers([Worker = {Pid, LTime} | Rest], Now, TTL, Overflow, Sup, Workers false -> do_reap_workers(Rest, Now, TTL, Overflow, Sup, [Worker | Workers]) end. + +start_timer(#state{overflow_check_period = undefined, overflow_ttl = 0}) -> + ok; +start_timer(#state{overflow_check_period = undefined, overflow_ttl = TTL}) -> + TimerTTL = min(TTL, ?CHECK_OVERFLOW_DEFAULT_PERIOD), %If overflow_ttl is less, than a second - period will be same as overflow_ttl + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}); +start_timer(#state{overflow_ttl = TTL}) -> + TimerTTL = min(TTL, ?CHECK_OVERFLOW_DEFAULT_PERIOD), + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}). From 8e898e5ebe7f7e957b0448574abc3b27eedbd2d3 Mon Sep 17 00:00:00 2001 From: tihon Date: Wed, 15 Jun 2016 16:04:35 +0300 Subject: [PATCH 06/12] add algoryhm improvement for ripping lifo workers + test --- src/poolboy.erl | 28 +++++++++++++++------------- test/poolboy_tests.erl | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 7fbf967..94070a7 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -10,7 +10,7 @@ -export_type([pool/0]). -define(TIMEOUT, 5000). --define(CHECK_OVERFLOW_DEFAULT_PERIOD, 1000000). %microseconds +-define(CHECK_OVERFLOW_DEFAULT_PERIOD, 1000000). %microseconds (1 sec) -ifdef(pre17). -type pid_queue() :: queue(). @@ -37,7 +37,7 @@ overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), strategy = lifo :: lifo | fifo, - overflow_check_period = ?CHECK_OVERFLOW_DEFAULT_PERIOD :: non_neg_integer(), %milliseconds + overflow_check_period :: non_neg_integer(), overflow_ttl = 0 :: non_neg_integer() %microseconds }). @@ -264,9 +264,10 @@ handle_info({rip_workers, TimerTTL}, State) -> #state{overflow = Overflow, overflow_ttl = OverTTL, workers = Workers, + strategy = Strategy, supervisor = Sup} = State, Now = os:timestamp(), - {UOverflow, UWorkers} = do_reap_workers(Workers, Now, OverTTL, Overflow, Sup, []), + {UOverflow, UWorkers} = do_reap_workers(Workers, Now, OverTTL, Overflow, Sup, [], Strategy), erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}), {noreply, State#state{overflow = UOverflow, workers = UWorkers}}; handle_info(_Info, State) -> @@ -376,24 +377,25 @@ add_worker_execute(WorkerPid, CRef, FromPid, Monitors) -> return_worker(lifo, Pid, Workers) -> [Pid | Workers]; return_worker(fifo, Pid, Workers) -> Workers ++ [Pid]. -do_reap_workers(UnScanned, _, _, 0, _, Workers) -> +do_reap_workers(UnScanned, _, _, 0, _, Workers, _) -> {0, lists:reverse(Workers) ++ UnScanned}; -do_reap_workers([], _, _, Overflow, _, Workers) -> +do_reap_workers([], _, _, Overflow, _, Workers, _) -> {Overflow, lists:reverse(Workers)}; -do_reap_workers([Worker = {Pid, LTime} | Rest], Now, TTL, Overflow, Sup, Workers) -> +do_reap_workers([Worker = {Pid, LTime} | Rest], Now, TTL, Overflow, Sup, Workers, Strategy) -> case timer:now_diff(Now, LTime) > TTL of true -> ok = dismiss_worker(Sup, Pid), - do_reap_workers(Rest, Now, TTL, Overflow - 1, Sup, Workers); + do_reap_workers(Rest, Now, TTL, Overflow - 1, Sup, Workers, Strategy); + false when Strategy =:= lifo -> + {Overflow, lists:reverse([Worker | Workers]) ++ Rest}; false -> - do_reap_workers(Rest, Now, TTL, Overflow, Sup, [Worker | Workers]) + do_reap_workers(Rest, Now, TTL, Overflow, Sup, [Worker | Workers], Strategy) end. -start_timer(#state{overflow_check_period = undefined, overflow_ttl = 0}) -> +start_timer(#state{overflow_ttl = 0}) -> %overflow turned off ok; -start_timer(#state{overflow_check_period = undefined, overflow_ttl = TTL}) -> - TimerTTL = min(TTL, ?CHECK_OVERFLOW_DEFAULT_PERIOD), %If overflow_ttl is less, than a second - period will be same as overflow_ttl +start_timer(#state{overflow_check_period = TimerTTL}) when is_number(TimerTTL) -> erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}); -start_timer(#state{overflow_ttl = TTL}) -> - TimerTTL = min(TTL, ?CHECK_OVERFLOW_DEFAULT_PERIOD), +start_timer(#state{overflow_check_period = undefined, overflow_ttl = TTL}) -> + TimerTTL = round(min(TTL, ?CHECK_OVERFLOW_DEFAULT_PERIOD) / 1000), %If overflow_ttl is less, than a second - period will be same as overflow_ttl erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}). diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index a3e4270..2dcd761 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -45,6 +45,12 @@ pool_test_() -> {<<"Pool with overflow_ttl behaves as expected">>, fun pool_overflow_ttl_workers/0 }, + {<<"Lifo pool with overflow_ttl rips workers">>, + fun lifo_overflow_rip_test/0 + }, + {<<"Fifo pool with overflow_ttl rips workers">>, + fun fifo_overflow_rip_test/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -442,6 +448,31 @@ pool_overflow_ttl_workers() -> ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), ok = pool_call(Pid, stop). +lifo_overflow_rip_test() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300, lifo), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker), + poolboy:checkin(Pid, Worker1), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + +fifo_overflow_rip_test() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300, fifo), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker), + poolboy:checkin(Pid, Worker1), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) @@ -604,10 +635,13 @@ new_pool(Size, MaxOverflow, Strategy) -> {strategy, Strategy}]). new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl, lifo). + +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl, Strategy) -> poolboy:start_link([{name, {local, poolboy_test}}, {worker_module, poolboy_test_worker}, {size, Size}, {max_overflow, MaxOverflow}, - {overflow_ttl, OverflowTtl}]). + {overflow_ttl, OverflowTtl}, {strategy, Strategy}]). pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request). From be3663437b6056f549505c46b0606f0a28aaa2a1 Mon Sep 17 00:00:00 2001 From: tihon Date: Wed, 15 Jun 2016 17:51:32 +0300 Subject: [PATCH 07/12] change os timestamp to monotonic time --- src/poolboy.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 94070a7..56f08fd 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -255,7 +255,7 @@ handle_info({'EXIT', Pid, _Reason}, State) -> case lists:keymember(Pid, 1, Workers) of true -> W = lists:keydelete(Pid, 1, Workers), - {noreply, State#state{workers = [{new_worker(Sup), os:timestamp()} | W]}}; + {noreply, State#state{workers = [{new_worker(Sup), erlang:monotonic_time(micro_seconds)} | W]}}; false -> {noreply, State} end @@ -266,7 +266,7 @@ handle_info({rip_workers, TimerTTL}, State) -> workers = Workers, strategy = Strategy, supervisor = Sup} = State, - Now = os:timestamp(), + Now = erlang:monotonic_time(micro_seconds), {UOverflow, UWorkers} = do_reap_workers(Workers, Now, OverTTL, Overflow, Sup, [], Strategy), erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}), {noreply, State#state{overflow = UOverflow, workers = UWorkers}}; @@ -306,7 +306,7 @@ prepopulate(N, Sup) -> prepopulate(0, _Sup, Workers) -> Workers; prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, [{new_worker(Sup), os:timestamp()} | Workers]). + prepopulate(N-1, Sup, [{new_worker(Sup), erlang:monotonic_time(micro_seconds)} | Workers]). handle_checkin(Pid, State) -> #state{supervisor = Sup, @@ -321,14 +321,14 @@ handle_checkin(Pid, State) -> gen_server:reply(From, Pid), State#state{waiting = Left}; {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> - LastUseTime = os:timestamp(), + LastUseTime = erlang:monotonic_time(micro_seconds), Workers = return_worker(Strategy, {Pid, LastUseTime}, State#state.workers), State#state{workers = Workers, waiting = Empty}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - LastUseTime = os:timestamp(), + LastUseTime = erlang:monotonic_time(micro_seconds), Workers = return_worker(Strategy, {Pid, LastUseTime}, State#state.workers), State#state{workers = Workers, waiting = Empty, overflow = 0} end. @@ -347,7 +347,7 @@ handle_worker_exit(Pid, State) -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> Workers = - [{new_worker(Sup), os:timestamp()} + [{new_worker(Sup), erlang:monotonic_time(micro_seconds)} | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], State#state{workers = Workers, waiting = Empty} end. @@ -382,7 +382,7 @@ do_reap_workers(UnScanned, _, _, 0, _, Workers, _) -> do_reap_workers([], _, _, Overflow, _, Workers, _) -> {Overflow, lists:reverse(Workers)}; do_reap_workers([Worker = {Pid, LTime} | Rest], Now, TTL, Overflow, Sup, Workers, Strategy) -> - case timer:now_diff(Now, LTime) > TTL of + case Now - LTime > TTL of true -> ok = dismiss_worker(Sup, Pid), do_reap_workers(Rest, Now, TTL, Overflow - 1, Sup, Workers, Strategy); From 04ea8a5479b273a052942755ebdc42c9e318d907 Mon Sep 17 00:00:00 2001 From: Tihon Date: Fri, 8 Jul 2016 14:03:26 +0300 Subject: [PATCH 08/12] set overflow check period to milliseconds --- src/poolboy.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 56f08fd..83dbef5 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -37,7 +37,7 @@ overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), strategy = lifo :: lifo | fifo, - overflow_check_period :: non_neg_integer(), + overflow_check_period :: non_neg_integer(), %milliseconds overflow_ttl = 0 :: non_neg_integer() %microseconds }). @@ -144,7 +144,7 @@ init([{strategy, fifo} | Rest], WorkerArgs, State) -> init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) -> init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl * 1000}); init([{overflow_check_period, OverflowCheckPeriod} | Rest], WorkerArgs, State) when is_integer(OverflowCheckPeriod) -> - init(Rest, WorkerArgs, State#state{overflow_check_period = OverflowCheckPeriod * 1000}); + init(Rest, WorkerArgs, State#state{overflow_check_period = OverflowCheckPeriod}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> From 9c06a9ad4e700e0724a2e3c4b44c5f5d51380a0d Mon Sep 17 00:00:00 2001 From: Valerii Tikhonov Date: Sun, 26 Aug 2018 14:52:13 +0200 Subject: [PATCH 09/12] Add custom run functions --- README.md | 24 +++++++++--------------- src/poolboy.app.src | 2 +- src/poolboy.erl | 2 +- src/poolboy_sup.erl | 6 +++++- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 25820b0..84b6c7a 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ # Poolboy - A hunky Erlang worker pool factory -[![Build Status](https://api.travis-ci.org/devinus/poolboy.svg?branch=master)](https://travis-ci.org/devinus/poolboy) +[![Build Status](https://api.travis-ci.org/comtihon/poolboy.svg?branch=master)](https://travis-ci.org/comtihon/poolboy) -[![Support via Gratipay](https://cdn.rawgit.com/gratipay/gratipay-badge/2.3.0/dist/gratipay.png)](https://gratipay.com/devinus/) Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. @@ -161,7 +160,14 @@ code_change(_OldVsn, State, _Extra) -> ## Pool Options - `name`: the pool name - optional -- `worker_module`: the module that represents the workers - mandatory +- `worker_module`: worker module. Can be either a `module` or `{module, function}`. +Example: +``` +start_pool(SizeArgs, WorkerArgs) -> + PoolArgs = [{worker_module, {mc_worker_api, connect}}] ++ SizeArgs, + supervisor:start_child(?MODULE, [PoolArgs, WorkerArgs]). +``` +In case of just atom `start_link` will be called. - `size`: maximum pool size - optional - `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be @@ -183,15 +189,3 @@ Returns : {Status, Workers, Overflow, InUse} - `Overflow`: Number of overflow workers started, should never exceed number specified by MaxOverflow when starting pool - `InUse`: Number of workers currently busy/checked out - -## Authors - -- Devin Torres (devinus) -- Andrew Thompson (Vagabond) -- Kurt Williams (onkel-dirtus) - -## License - -Poolboy is available in the public domain (see `UNLICENSE`). -Poolboy is also optionally available under the ISC license (see `LICENSE`), -meant especially for jurisdictions that do not recognize public domain works. diff --git a/src/poolboy.app.src b/src/poolboy.app.src index 0f85255..1af02ec 100644 --- a/src/poolboy.app.src +++ b/src/poolboy.app.src @@ -1,6 +1,6 @@ {application, poolboy, [ {description, "A hunky Erlang worker pool factory"}, - {vsn, "1.5.1"}, + {vsn, "1.6.0"}, {applications, [kernel, stdlib]}, {registered, [poolboy]}, diff --git a/src/poolboy.erl b/src/poolboy.erl index 83dbef5..53be01b 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -130,7 +130,7 @@ init({PoolArgs, WorkerArgs}) -> Monitors = ets:new(monitors, [private]), init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). -init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> +init([{worker_module, Mod} | Rest], WorkerArgs, State) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), init(Rest, WorkerArgs, State#state{supervisor = Sup}); init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index e6485a6..5e28727 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -8,7 +8,11 @@ start_link(Mod, Args) -> supervisor:start_link(?MODULE, {Mod, Args}). -init({Mod, Args}) -> +init({Mod, Args}) when is_atom(Mod)-> {ok, {{simple_one_for_one, 0, 1}, [{Mod, {Mod, start_link, [Args]}, + temporary, 5000, worker, [Mod]}]}}; +init({{Mod, Fun}, Args}) -> + {ok, {{simple_one_for_one, 0, 1}, + [{Mod, {Mod, Fun, [Args]}, temporary, 5000, worker, [Mod]}]}}. From 6b5212c4cb20b85e57c9627bb0940c126f4e0766 Mon Sep 17 00:00:00 2001 From: Val Date: Wed, 1 Jun 2022 15:05:40 +0200 Subject: [PATCH 10/12] fix stacktrace for otp24 --- src/poolboy.app.src | 2 +- src/poolboy.erl | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/poolboy.app.src b/src/poolboy.app.src index 1af02ec..4755445 100644 --- a/src/poolboy.app.src +++ b/src/poolboy.app.src @@ -1,6 +1,6 @@ {application, poolboy, [ {description, "A hunky Erlang worker pool factory"}, - {vsn, "1.6.0"}, + {vsn, "1.6.1"}, {applications, [kernel, stdlib]}, {registered, [poolboy]}, diff --git a/src/poolboy.erl b/src/poolboy.erl index 53be01b..494d507 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -51,15 +51,27 @@ checkout(Pool, Block) -> -spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout()) -> pid() | full. +-if(?OTP_RELEASE >= 23). checkout(Pool, Block, Timeout) -> CRef = make_ref(), try gen_server:call(Pool, {checkout, CRef, Block}, Timeout) catch - Class:Reason -> + Class:Reason:Stacktrace -> % new way of trycatch + gen_server:cast(Pool, {cancel_waiting, CRef}), + erlang:raise(Class, Reason, Stacktrace) + end. +-else. +checkout(Pool, Block, Timeout) -> + CRef = make_ref(), + try + gen_server:call(Pool, {checkout, CRef, Block}, Timeout) + catch + Class:Reason -> %legacy way gen_server:cast(Pool, {cancel_waiting, CRef}), erlang:raise(Class, Reason, erlang:get_stacktrace()) end. +-endif. -spec checkin(Pool :: pool(), Worker :: pid()) -> ok. checkin(Pool, Worker) when is_pid(Worker) -> From 961ec4609df853441f33f1f996fb89c5e0843d45 Mon Sep 17 00:00:00 2001 From: Val Date: Wed, 1 Jun 2022 15:24:08 +0200 Subject: [PATCH 11/12] Create erlang.yml --- .github/workflows/erlang.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .github/workflows/erlang.yml diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml new file mode 100644 index 0000000..a57b2d6 --- /dev/null +++ b/.github/workflows/erlang.yml @@ -0,0 +1,25 @@ +name: Erlang CI + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +permissions: + contents: read + +jobs: + + build: + + runs-on: ubuntu-latest + + container: + image: erlang:22.0.7 + + steps: + - uses: actions/checkout@v3 + - name: Compile + run: rebar3 compile + From 4866eb681f84489e30af038809946df39c794b89 Mon Sep 17 00:00:00 2001 From: Val Date: Wed, 1 Jun 2022 15:28:08 +0200 Subject: [PATCH 12/12] update badge --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 84b6c7a..2d1281e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # Poolboy - A hunky Erlang worker pool factory -[![Build Status](https://api.travis-ci.org/comtihon/poolboy.svg?branch=master)](https://travis-ci.org/comtihon/poolboy) - +[![Erlang CI](https://github.com/comtihon/poolboy/actions/workflows/erlang.yml/badge.svg)](https://github.com/comtihon/poolboy/actions/workflows/erlang.yml) Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery.