From 23d3209c81c1938e0deaad5a8700818dae483038 Mon Sep 17 00:00:00 2001 From: hughxing Date: Mon, 3 Sep 2018 20:07:08 +0800 Subject: [PATCH] workers (a list) replace by queue when strategy equal fifo --- src/poolboy.erl | 114 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 87 insertions(+), 27 deletions(-) diff --git a/src/poolboy.erl b/src/poolboy.erl index 7c5605e..0ab2890 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -38,7 +38,7 @@ -record(state, { supervisor :: undefined | pid(), - workers = [] :: [pid()], + workers = [] :: [pid()] | pid_queue(), waiting :: pid_queue(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), @@ -164,9 +164,14 @@ init([{strategy, fifo} | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State#state{strategy = fifo}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); -init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> - Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. +init([], _WorkerArgs, #state{size = Size, supervisor = Sup, strategy = Strategy} = State) -> + Workers = prepopulate(Size, Sup), + NewWorkers = + case Strategy of + fifo -> queue:from_list(Workers); + lifo -> Workers + end, + {ok, State#state{workers = NewWorkers}}. handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of @@ -206,11 +211,11 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> monitors = Monitors, overflow = Overflow, max_overflow = MaxOverflow} = State, - case Workers of - [Pid | Left] -> + case out_worker(Workers) of + {Pid, WorkersRemain} -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), - {reply, Pid, State#state{workers = Left}}; + {reply, Pid, State#state{workers = WorkersRemain}}; [] when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, MRef} = new_worker(Sup, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -227,11 +232,17 @@ handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, overflow = Overflow} = State, - StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + StateName = state_name(State), + LenWorkers = len_workers(Workers), + {reply, {StateName, LenWorkers, Overflow, ets:info(Monitors, size)}, State}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, - {reply, Workers, State}; + NewWorkers = + case queue:is_queue(Workers) of + true -> queue:to_list(Workers); + false -> Workers + end, + {reply, NewWorkers, State}; handle_call(get_all_workers, _From, State) -> Sup = State#state.supervisor, WorkerList = supervisor:which_children(Sup), @@ -256,7 +267,7 @@ handle_info({'DOWN', MRef, _, _, _}, State) -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; -handle_info({'EXIT', Pid, _Reason}, State) -> +handle_info({'EXIT', Pid, _Reason}, State = #state{workers = Workers}) -> #state{supervisor = Sup, monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of @@ -266,10 +277,11 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of - true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), - {noreply, State#state{workers = [new_worker(Sup) | W]}}; + case member_workers(Pid, Workers) of + true -> + WorkersRemain = filter_workers(fun (P) -> P =/= Pid end, Workers), + NewWorkers = in_worker(new_worker(Sup), WorkersRemain), + {noreply, State#state{workers = NewWorkers}}; false -> {noreply, State} end @@ -278,8 +290,13 @@ handle_info({'EXIT', Pid, _Reason}, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State) -> - ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), +terminate(_Reason, State = #state{workers = Workers}) -> + WorkersList = + case queue:is_queue(Workers) of + true -> queue:to_list(Workers); + false -> Workers + end, + ok = lists:foreach(fun (W) -> unlink(W) end, WorkersList), true = exit(State#state.supervisor, shutdown), ok. @@ -322,8 +339,7 @@ handle_checkin(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, - overflow = Overflow, - strategy = Strategy} = State, + overflow = Overflow} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -333,10 +349,7 @@ handle_checkin(Pid, State) -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = case Strategy of - lifo -> [Pid | State#state.workers]; - fifo -> State#state.workers ++ [Pid] - end, + Workers = in_worker(Pid, State#state.workers), State#state{workers = Workers, waiting = Empty, overflow = 0} end. @@ -353,15 +366,16 @@ handle_worker_exit(Pid, State) -> {empty, Empty} when Overflow > 0 -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> - Workers = - [new_worker(Sup) - | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], + Workers = in_worker( + new_worker(Sup), + filter_workers(fun (P) -> P =/= Pid end, State#state.workers) + ), State#state{workers = Workers, waiting = Empty} end. state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> #state{max_overflow = MaxOverflow, workers = Workers} = State, - case length(Workers) == 0 of + case len_workers(Workers) == 0 of true when MaxOverflow < 1 -> full; true -> overflow; false -> ready @@ -370,3 +384,49 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> full; state_name(_State) -> overflow. + +%% workers is list when strategy equal lifo, otherwise workers is queue. + +%% out a worker from workers (list or queue) +%% return [] when empty, otherwise return {Pid, WorkersRemain} +out_worker([Pid | Left]) -> + {Pid, Left}; +out_worker([]) -> + []; +out_worker(WorkerQueue) -> + case queue:out(WorkerQueue) of + {{value, Item}, RemainQueue} -> + {Item, RemainQueue}; + {empty, _} -> + [] + end. + +%% put a worker in workers (list or queue) +in_worker(Pid, []) -> + [Pid]; +in_worker(Pid, PidList = [_|_]) -> + [Pid | PidList]; +in_worker(Pid, PidQueue) -> + queue:in(Pid, PidQueue). + +%% check is member of workers (list or queue) +member_workers(_Item, []) -> + false; +member_workers(Item, Workers = [_|_]) -> + lists:member(Item, Workers); +member_workers(Item, Workers) -> + queue:member(Item, Workers). + +filter_workers(_Fun, []) -> + ok; +filter_workers(Fun, Workers = [_|_]) -> + lists:filter(Fun, Workers); +filter_workers(Fun, Workers) -> + queue:filter(Fun, Workers). + +len_workers([]) -> 0; +len_workers(Workers = [_|_]) -> + length(Workers); +len_workers(Workers) -> + queue:len(Workers). +