diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml new file mode 100644 index 0000000..a57b2d6 --- /dev/null +++ b/.github/workflows/erlang.yml @@ -0,0 +1,25 @@ +name: Erlang CI + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +permissions: + contents: read + +jobs: + + build: + + runs-on: ubuntu-latest + + container: + image: erlang:22.0.7 + + steps: + - uses: actions/checkout@v3 + - name: Compile + run: rebar3 compile + diff --git a/.gitignore b/.gitignore index 02ebb9e..bad35ac 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +/rebar.lock +/poolboy.iml +/.idea .eunit .rebar _build diff --git a/README.md b/README.md index e603768..2d1281e 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,13 @@ # Poolboy - A hunky Erlang worker pool factory -[![Build Status](https://api.travis-ci.org/devinus/poolboy.svg?branch=master)](https://travis-ci.org/devinus/poolboy) - -[![Support via Gratipay](https://cdn.rawgit.com/gratipay/gratipay-badge/2.3.0/dist/gratipay.png)](https://gratipay.com/devinus/) +[![Erlang CI](https://github.com/comtihon/poolboy/actions/workflows/erlang.yml/badge.svg)](https://github.com/comtihon/poolboy/actions/workflows/erlang.yml) Poolboy is a **lightweight**, **generic** pooling library for Erlang with a focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. ## Usage - +The most basic use case is to check out a worker, make a call and manually +return it to the pool when done ```erl-sh 1> Worker = poolboy:checkout(PoolName). <0.9001.0> @@ -17,7 +16,15 @@ ok 3> poolboy:checkin(PoolName, Worker). ok ``` - +Alternatively you can use a transaction which will return the worker to the +pool when the call is finished. +```erl-sh +poolboy:transaction( + PoolName, + fun(Worker) -> gen_server:call(Worker, Request) end, + TransactionTimeout +) +``` ## Example This is an example application showcasing database connection pools using @@ -149,23 +156,35 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. ``` -## Options +## Pool Options -- `name`: the pool name -- `worker_module`: the module that represents the workers -- `size`: maximum pool size -- `max_overflow`: maximum number of workers created if pool is empty +- `name`: the pool name - optional +- `worker_module`: worker module. Can be either a `module` or `{module, function}`. +Example: +``` +start_pool(SizeArgs, WorkerArgs) -> + PoolArgs = [{worker_module, {mc_worker_api, connect}}] ++ SizeArgs, + supervisor:start_child(?MODULE, [PoolArgs, WorkerArgs]). +``` +In case of just atom `start_link` will be called. +- `size`: maximum pool size - optional +- `max_overflow`: maximum number of workers created if pool is empty - optional - `strategy`: `lifo` or `fifo`, determines whether checked in workers should be placed first or last in the line of available workers. Default is `lifo`. - -## Authors - -- Devin Torres (devinus) -- Andrew Thompson (Vagabond) -- Kurt Williams (onkel-dirtus) - -## License - -Poolboy is available in the public domain (see `UNLICENSE`). -Poolboy is also optionally available under the ISC license (see `LICENSE`), -meant especially for jurisdictions that do not recognize public domain works. +- `overflow_ttl`: time in milliseconds you want to wait before removing overflow + workers. Useful when it's expensive to start workers. Default is 0. +- `overflow_check_period`: time in milliseconds for checking overflow workers to rip. + Default is min(1 sec, overflow_ttl). Cheking job will not be started, if overflow_ttl is 0. + +## Pool Status +Returns : {Status, Workers, Overflow, InUse} +- `Status`: ready | full | overflow + The ready atom indicates there are workers that are not checked out + ready. The full atom indicates all workers including overflow are + checked out. The overflow atom is used to describe the condition + when all permanent workers are in use but there is overflow capacity + available. +- `Workers`: Number of workers ready for use. +- `Overflow`: Number of overflow workers started, should never exceed number + specified by MaxOverflow when starting pool +- `InUse`: Number of workers currently busy/checked out diff --git a/src/poolboy.app.src b/src/poolboy.app.src index 0f85255..4755445 100644 --- a/src/poolboy.app.src +++ b/src/poolboy.app.src @@ -1,6 +1,6 @@ {application, poolboy, [ {description, "A hunky Erlang worker pool factory"}, - {vsn, "1.5.1"}, + {vsn, "1.6.1"}, {applications, [kernel, stdlib]}, {registered, [poolboy]}, diff --git a/src/poolboy.erl b/src/poolboy.erl index db4973b..494d507 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -1,5 +1,4 @@ %% Poolboy - A hunky Erlang worker pool factory - -module(poolboy). -behaviour(gen_server). @@ -11,6 +10,7 @@ -export_type([pool/0]). -define(TIMEOUT, 5000). +-define(CHECK_OVERFLOW_DEFAULT_PERIOD, 1000000). %microseconds (1 sec) -ifdef(pre17). -type pid_queue() :: queue(). @@ -36,7 +36,9 @@ size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + strategy = lifo :: lifo | fifo, + overflow_check_period :: non_neg_integer(), %milliseconds + overflow_ttl = 0 :: non_neg_integer() %microseconds }). -spec checkout(Pool :: pool()) -> pid(). @@ -49,15 +51,27 @@ checkout(Pool, Block) -> -spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout()) -> pid() | full. +-if(?OTP_RELEASE >= 23). +checkout(Pool, Block, Timeout) -> + CRef = make_ref(), + try + gen_server:call(Pool, {checkout, CRef, Block}, Timeout) + catch + Class:Reason:Stacktrace -> % new way of trycatch + gen_server:cast(Pool, {cancel_waiting, CRef}), + erlang:raise(Class, Reason, Stacktrace) + end. +-else. checkout(Pool, Block, Timeout) -> CRef = make_ref(), try gen_server:call(Pool, {checkout, CRef, Block}, Timeout) catch - Class:Reason -> + Class:Reason -> %legacy way gen_server:cast(Pool, {cancel_waiting, CRef}), erlang:raise(Class, Reason, erlang:get_stacktrace()) end. +-endif. -spec checkin(Pool :: pool(), Worker :: pid()) -> ok. checkin(Pool, Worker) when is_pid(Worker) -> @@ -128,7 +142,7 @@ init({PoolArgs, WorkerArgs}) -> Monitors = ets:new(monitors, [private]), init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). -init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> +init([{worker_module, Mod} | Rest], WorkerArgs, State) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), init(Rest, WorkerArgs, State#state{supervisor = Sup}); init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> @@ -139,17 +153,21 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = lifo}); init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([{overflow_ttl, OverflowTtl} | Rest], WorkerArgs, State) when is_integer(OverflowTtl) -> + init(Rest, WorkerArgs, State#state{overflow_ttl = OverflowTtl * 1000}); +init([{overflow_check_period, OverflowCheckPeriod} | Rest], WorkerArgs, State) when is_integer(OverflowCheckPeriod) -> + init(Rest, WorkerArgs, State#state{overflow_check_period = OverflowCheckPeriod}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> Workers = prepopulate(Size, Sup), + start_timer(State), {ok, State#state{workers = Workers}}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> - case ets:lookup(Monitors, Pid) of + case ets:take(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), - true = ets:delete(Monitors, Pid), NewState = handle_checkin(Pid, State), {noreply, NewState}; [] -> @@ -184,13 +202,15 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> overflow = Overflow, max_overflow = MaxOverflow} = State, case Workers of - [Pid | Left] -> - MRef = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + [{Pid, _} | Left] when State#state.overflow_ttl > 0 -> + add_worker_execute(Pid, CRef, FromPid, Monitors), + {reply, Pid, State#state{workers = Left}}; + [{Pid, _} | Left] -> + add_worker_execute(Pid, CRef, FromPid, Monitors), {reply, Pid, State#state{workers = Left}}; [] when MaxOverflow > 0, Overflow < MaxOverflow -> - {Pid, MRef} = new_worker(Sup, FromPid), - true = ets:insert(Monitors, {Pid, CRef, MRef}), + Pid = new_worker(Sup), + add_worker_execute(Pid, CRef, FromPid, Monitors), {reply, Pid, State#state{overflow = Overflow + 1}}; [] when Block =:= false -> {reply, full, State}; @@ -204,11 +224,12 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, + CheckedOutWorkers = ets:info(Monitors, size), StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; -handle_call(get_avail_workers, _From, State) -> - Workers = State#state.workers, - {reply, Workers, State}; + {reply, {StateName, length(Workers), Overflow, CheckedOutWorkers}, State}; +handle_call(get_avail_workers, _From, State = #state{workers = Workers}) -> + Pids = lists:map(fun({Pid, _TS}) -> Pid end, Workers), + {reply, Pids, State}; handle_call(get_all_workers, _From, State) -> Sup = State#state.supervisor, WorkerList = supervisor:which_children(Sup), @@ -235,29 +256,38 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, - monitors = Monitors} = State, - case ets:lookup(Monitors, Pid) of + monitors = Monitors, + workers = Workers} = State, + case ets:take(Monitors, Pid) of [{Pid, _, MRef}] -> true = erlang:demonitor(MRef), - true = ets:delete(Monitors, Pid), NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case lists:keymember(Pid, 1, Workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), - {noreply, State#state{workers = [new_worker(Sup) | W]}}; + W = lists:keydelete(Pid, 1, Workers), + {noreply, State#state{workers = [{new_worker(Sup), erlang:monotonic_time(micro_seconds)} | W]}}; false -> {noreply, State} end end; - +handle_info({rip_workers, TimerTTL}, State) -> + #state{overflow = Overflow, + overflow_ttl = OverTTL, + workers = Workers, + strategy = Strategy, + supervisor = Sup} = State, + Now = erlang:monotonic_time(micro_seconds), + {UOverflow, UWorkers} = do_reap_workers(Workers, Now, OverTTL, Overflow, Sup, [], Strategy), + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}), + {noreply, State#state{overflow = UOverflow, workers = UWorkers}}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State) -> - ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), - true = exit(State#state.supervisor, shutdown), +terminate(_Reason, #state{workers = Workers, supervisor = Sup}) -> + ok = lists:foreach(fun ({W, _}) -> unlink(W) end, Workers), + true = exit(Sup, shutdown), ok. code_change(_OldVsn, State, _Extra) -> @@ -276,11 +306,6 @@ new_worker(Sup) -> true = link(Pid), Pid. -new_worker(Sup, FromPid) -> - Pid = new_worker(Sup), - Ref = erlang:monitor(process, FromPid), - {Pid, Ref}. - dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). @@ -293,27 +318,30 @@ prepopulate(N, Sup) -> prepopulate(0, _Sup, Workers) -> Workers; prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). + prepopulate(N-1, Sup, [{new_worker(Sup), erlang:monotonic_time(micro_seconds)} | Workers]). handle_checkin(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, overflow = Overflow, - strategy = Strategy} = State, + strategy = Strategy, + overflow_ttl = OverflowTtl} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), gen_server:reply(From, Pid), State#state{waiting = Left}; + {empty, Empty} when Overflow > 0, OverflowTtl > 0 -> + LastUseTime = erlang:monotonic_time(micro_seconds), + Workers = return_worker(Strategy, {Pid, LastUseTime}, State#state.workers), + State#state{workers = Workers, waiting = Empty}; {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = case Strategy of - lifo -> [Pid | State#state.workers]; - fifo -> State#state.workers ++ [Pid] - end, + LastUseTime = erlang:monotonic_time(micro_seconds), + Workers = return_worker(Strategy, {Pid, LastUseTime}, State#state.workers), State#state{workers = Workers, waiting = Empty, overflow = 0} end. @@ -331,7 +359,7 @@ handle_worker_exit(Pid, State) -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> Workers = - [new_worker(Sup) + [{new_worker(Sup), erlang:monotonic_time(micro_seconds)} | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], State#state{workers = Workers, waiting = Empty} end. @@ -343,7 +371,43 @@ state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> true -> overflow; false -> ready end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; +state_name(State = #state{overflow = Overflow}) when Overflow > 0 -> + #state{max_overflow = MaxOverflow, workers = Workers, overflow = Overflow} = State, + NumberOfWorkers = length(Workers), + case MaxOverflow == Overflow of + true when NumberOfWorkers > 0 -> ready; + true -> full; + false -> overflow + end; state_name(_State) -> overflow. + +add_worker_execute(WorkerPid, CRef, FromPid, Monitors) -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {WorkerPid, CRef, MRef}). + +return_worker(lifo, Pid, Workers) -> [Pid | Workers]; +return_worker(fifo, Pid, Workers) -> Workers ++ [Pid]. + +do_reap_workers(UnScanned, _, _, 0, _, Workers, _) -> + {0, lists:reverse(Workers) ++ UnScanned}; +do_reap_workers([], _, _, Overflow, _, Workers, _) -> + {Overflow, lists:reverse(Workers)}; +do_reap_workers([Worker = {Pid, LTime} | Rest], Now, TTL, Overflow, Sup, Workers, Strategy) -> + case Now - LTime > TTL of + true -> + ok = dismiss_worker(Sup, Pid), + do_reap_workers(Rest, Now, TTL, Overflow - 1, Sup, Workers, Strategy); + false when Strategy =:= lifo -> + {Overflow, lists:reverse([Worker | Workers]) ++ Rest}; + false -> + do_reap_workers(Rest, Now, TTL, Overflow, Sup, [Worker | Workers], Strategy) + end. + +start_timer(#state{overflow_ttl = 0}) -> %overflow turned off + ok; +start_timer(#state{overflow_check_period = TimerTTL}) when is_number(TimerTTL) -> + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}); +start_timer(#state{overflow_check_period = undefined, overflow_ttl = TTL}) -> + TimerTTL = round(min(TTL, ?CHECK_OVERFLOW_DEFAULT_PERIOD) / 1000), %If overflow_ttl is less, than a second - period will be same as overflow_ttl + erlang:send_after(TimerTTL, self(), {rip_workers, TimerTTL}). diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index e6485a6..5e28727 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -8,7 +8,11 @@ start_link(Mod, Args) -> supervisor:start_link(?MODULE, {Mod, Args}). -init({Mod, Args}) -> +init({Mod, Args}) when is_atom(Mod)-> {ok, {{simple_one_for_one, 0, 1}, [{Mod, {Mod, start_link, [Args]}, + temporary, 5000, worker, [Mod]}]}}; +init({{Mod, Fun}, Args}) -> + {ok, {{simple_one_for_one, 0, 1}, + [{Mod, {Mod, Fun, [Args]}, temporary, 5000, worker, [Mod]}]}}. diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index b0f3b39..2dcd761 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -42,6 +42,15 @@ pool_test_() -> {<<"Non-blocking pool behaves when full">>, fun pool_full_nonblocking/0 }, + {<<"Pool with overflow_ttl behaves as expected">>, + fun pool_overflow_ttl_workers/0 + }, + {<<"Lifo pool with overflow_ttl rips workers">>, + fun lifo_overflow_rip_test/0 + }, + {<<"Fifo pool with overflow_ttl rips workers">>, + fun fifo_overflow_rip_test/0 + }, {<<"Pool behaves on owner death">>, fun owner_death/0 }, @@ -386,6 +395,85 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). +pool_overflow_ttl_workers() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 1, 800), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + % Test pool behaves normally when full + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(full, poolboy:checkout(Pid, false)), + % Test first worker is returned to list of available workers + poolboy:checkin(Pid, Worker), + timer:sleep(500), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + % Ensure first worker is in fact being reused + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(Worker, Worker2), + % Test second worker is returned to list of available workers + poolboy:checkin(Pid, Worker1), + timer:sleep(500), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + % Ensure second worker is in fact being reused + Worker3 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pid)), + ?assertEqual(Worker1, Worker3), + % Test we've got two workers ready when two are checked in in quick + % succession + poolboy:checkin(Pid, Worker2), + timer:sleep(100), + ?assertEqual({ready, 1, 1, 1}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker3), + timer:sleep(100), + ?assertEqual({ready, 2, 1, 0}, poolboy:status(Pid)), + % Test an owner death + spawn(fun() -> + poolboy:checkout(Pid), + receive after 100 -> exit(normal) end + end), + ?assertEqual({ready, 2, 1, 0}, poolboy:status(Pid)), + ?assertEqual(2, length(pool_call(Pid, get_all_workers))), + % Test overflow worker is reaped in the correct time period + timer:sleep(1500), + % Test overflow worker is reaped in the correct time period + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + % Test worker death behaviour + Worker4 = poolboy:checkout(Pid), + Worker5 = poolboy:checkout(Pid), + exit(Worker5, kill), + timer:sleep(100), + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pid)), + exit(Worker4, kill), + timer:sleep(100), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + +lifo_overflow_rip_test() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300, lifo), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker), + poolboy:checkin(Pid, Worker1), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + +fifo_overflow_rip_test() -> + {ok, Pid} = new_pool_with_overflow_ttl(1, 2, 300, fifo), + Worker = poolboy:checkout(Pid), + Worker1 = poolboy:checkout(Pid), + Worker2 = poolboy:checkout(Pid), + ?assertEqual({full, 0, 2, 3}, poolboy:status(Pid)), + poolboy:checkin(Pid, Worker), + poolboy:checkin(Pid, Worker1), + poolboy:checkin(Pid, Worker2), + timer:sleep(500), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pid)), + ok = pool_call(Pid, stop). + owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. @@ -516,7 +604,7 @@ reuses_waiting_monitor_on_worker_exit() -> receive ok -> ok end end), - Worker = receive {worker, Worker} -> Worker end, + Worker = receive {worker, W} -> W end, Ref = monitor(process, Worker), exit(Worker, kill), receive @@ -546,5 +634,14 @@ new_pool(Size, MaxOverflow, Strategy) -> {size, Size}, {max_overflow, MaxOverflow}, {strategy, Strategy}]). +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl) -> + new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl, lifo). + +new_pool_with_overflow_ttl(Size, MaxOverflow, OverflowTtl, Strategy) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, + {overflow_ttl, OverflowTtl}, {strategy, Strategy}]). + pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request).