From 912f04be2c0339879998ecc4e65cc60eed36d5cf Mon Sep 17 00:00:00 2001 From: Devin Torres Date: Wed, 29 Oct 2014 04:29:07 -0500 Subject: [PATCH] Use an ETS table for the waiting queue --- src/poolboy.erl | 77 +++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index f861219..fa82546 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -11,12 +11,6 @@ -define(TIMEOUT, 5000). --ifdef(pre17). --type pid_queue() :: queue(). --else. --type pid_queue() :: queue:queue(). --endif. - -type pool() :: Name :: (atom() | pid()) | {Name :: atom(), node()} | @@ -30,7 +24,7 @@ -record(state, { supervisor :: pid(), workers :: [pid()], - waiting :: pid_queue(), + waiting :: ets:tid(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), @@ -121,7 +115,7 @@ status(Pool) -> init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), - Waiting = queue:new(), + Waiting = ets:new(waiting, [private]), Monitors = ets:new(monitors, [private]), init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). @@ -149,11 +143,14 @@ handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> {noreply, State} end; -handle_cast({cancel_waiting, Pid}, State) -> - Waiting = queue:filter(fun ({{P, _}, Ref}) -> - P =/= Pid orelse not(erlang:demonitor(Ref)) - end, State#state.waiting), - {noreply, State#state{waiting = Waiting}}; +handle_cast({cancel_waiting, Pid}, State = #state{waiting = Waiting}) -> + case ets:match_object(Waiting, {{Pid, '_'}, '_'}, 1) of + {[{From, Ref}], _} -> + true = erlang:demonitor(Ref), + true = ets:delete(Waiting, From); + {[], _} -> true + end, + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -177,8 +174,8 @@ handle_call({checkout, Block}, {FromPid, _} = From, State) -> {reply, full, State}; [] -> Ref = erlang:monitor(process, FromPid), - Waiting = queue:in({From, Ref}, State#state.waiting), - {noreply, State#state{waiting = Waiting}} + true = ets:insert(State#state.waiting, {From, Ref}), + {noreply, State} end; handle_call(status, _From, State) -> @@ -203,15 +200,15 @@ handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, {reply, Reply, State}. -handle_info({'DOWN', Ref, _, _, _}, State) -> - case ets:match(State#state.monitors, {'$1', Ref}) of - [[Pid]] -> - true = ets:delete(State#state.monitors, Pid), +handle_info({'DOWN', Ref, _, _, _}, #state{monitors = Monitors} = State) -> + case ets:match(Monitors, {'$1', Ref}, 1) of + {[[Pid]], _} -> + true = ets:delete(Monitors, Pid), NewState = handle_checkin(Pid, State), {noreply, NewState}; - [] -> - Waiting = queue:filter(fun ({_, R}) -> R =/= Ref end, State#state.waiting), - {noreply, State#state{waiting = Waiting}} + {[], _} -> + true = ets:match_delete(State#state.waiting, {'_', Ref}), + {noreply, State} end; handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, @@ -280,37 +277,41 @@ handle_checkin(Pid, State) -> waiting = Waiting, monitors = Monitors, overflow = Overflow} = State, - case queue:out(Waiting) of - {{value, {From, Ref}}, Left} -> - true = ets:insert(Monitors, {Pid, Ref}), - gen_server:reply(From, Pid), - State#state{waiting = Left}; - {empty, Empty} when Overflow > 0 -> + case ets:first(Waiting) of + '$end_of_table' when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), - State#state{waiting = Empty, overflow = Overflow - 1}; - {empty, Empty} -> + State#state{overflow = Overflow - 1}; + '$end_of_table' -> Workers = [Pid | State#state.workers], - State#state{workers = Workers, waiting = Empty, overflow = 0} + State#state{workers = Workers, overflow = 0}; + From -> + Ref = ets:lookup_element(Waiting, From, 2), + true = ets:delete(Waiting, From), + true = ets:insert(Monitors, {Pid, Ref}), + gen_server:reply(From, Pid), + State end. handle_worker_exit(Pid, State) -> #state{supervisor = Sup, + waiting = Waiting, monitors = Monitors, overflow = Overflow} = State, - case queue:out(State#state.waiting) of - {{value, {{FromPid, _} = From, _}}, LeftWaiting} -> + case ets:first(Waiting) of + {FromPid, _} = From -> + true = ets:delete(Waiting, From), MonitorRef = erlang:monitor(process, FromPid), NewWorker = new_worker(State#state.supervisor), true = ets:insert(Monitors, {NewWorker, MonitorRef}), gen_server:reply(From, NewWorker), - State#state{waiting = LeftWaiting}; - {empty, Empty} when Overflow > 0 -> - State#state{overflow = Overflow - 1, waiting = Empty}; - {empty, Empty} -> + State; + '$end_of_table' when Overflow > 0 -> + State#state{overflow = Overflow - 1}; + '$end_of_table' -> Workers = [new_worker(Sup) | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], - State#state{workers = Workers, waiting = Empty} + State#state{workers = Workers} end. state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->