Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 87 additions & 27 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

-record(state, {
supervisor :: undefined | pid(),
workers = [] :: [pid()],
workers = [] :: [pid()] | pid_queue(),
waiting :: pid_queue(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
Expand Down Expand Up @@ -164,9 +164,14 @@ init([{strategy, fifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = fifo});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Workers = prepopulate(Size, Sup),
{ok, State#state{workers = Workers}}.
init([], _WorkerArgs, #state{size = Size, supervisor = Sup, strategy = Strategy} = State) ->
Workers = prepopulate(Size, Sup),
NewWorkers =
case Strategy of
fifo -> queue:from_list(Workers);
lifo -> Workers
end,
{ok, State#state{workers = NewWorkers}}.

handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
Expand Down Expand Up @@ -206,11 +211,11 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow} = State,
case Workers of
[Pid | Left] ->
case out_worker(Workers) of
{Pid, WorkersRemain} ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{workers = Left}};
{reply, Pid, State#state{workers = WorkersRemain}};
[] when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, MRef} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
Expand All @@ -227,11 +232,17 @@ handle_call(status, _From, State) ->
#state{workers = Workers,
monitors = Monitors,
overflow = Overflow} = State,
StateName = state_name(State),
{reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
StateName = state_name(State),
LenWorkers = len_workers(Workers),
{reply, {StateName, LenWorkers, Overflow, ets:info(Monitors, size)}, State};
handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
{reply, Workers, State};
NewWorkers =
case queue:is_queue(Workers) of
true -> queue:to_list(Workers);
false -> Workers
end,
{reply, NewWorkers, State};
handle_call(get_all_workers, _From, State) ->
Sup = State#state.supervisor,
WorkerList = supervisor:which_children(Sup),
Expand All @@ -256,7 +267,7 @@ handle_info({'DOWN', MRef, _, _, _}, State) ->
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
handle_info({'EXIT', Pid, _Reason}, State = #state{workers = Workers}) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
Expand All @@ -266,10 +277,11 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
NewState = handle_worker_exit(Pid, State),
{noreply, NewState};
[] ->
case lists:member(Pid, State#state.workers) of
true ->
W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
{noreply, State#state{workers = [new_worker(Sup) | W]}};
case member_workers(Pid, Workers) of
true ->
WorkersRemain = filter_workers(fun (P) -> P =/= Pid end, Workers),
NewWorkers = in_worker(new_worker(Sup), WorkersRemain),
{noreply, State#state{workers = NewWorkers}};
false ->
{noreply, State}
end
Expand All @@ -278,8 +290,13 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, State) ->
ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers),
terminate(_Reason, State = #state{workers = Workers}) ->
WorkersList =
case queue:is_queue(Workers) of
true -> queue:to_list(Workers);
false -> Workers
end,
ok = lists:foreach(fun (W) -> unlink(W) end, WorkersList),
true = exit(State#state.supervisor, shutdown),
ok.

Expand Down Expand Up @@ -322,8 +339,7 @@ handle_checkin(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow,
strategy = Strategy} = State,
overflow = Overflow} = State,
case queue:out(Waiting) of
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
Expand All @@ -333,10 +349,7 @@ handle_checkin(Pid, State) ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
{empty, Empty} ->
Workers = case Strategy of
lifo -> [Pid | State#state.workers];
fifo -> State#state.workers ++ [Pid]
end,
Workers = in_worker(Pid, State#state.workers),
State#state{workers = Workers, waiting = Empty, overflow = 0}
end.

Expand All @@ -353,15 +366,16 @@ handle_worker_exit(Pid, State) ->
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
Workers =
[new_worker(Sup)
| lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
Workers = in_worker(
new_worker(Sup),
filter_workers(fun (P) -> P =/= Pid end, State#state.workers)
),
State#state{workers = Workers, waiting = Empty}
end.

state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
#state{max_overflow = MaxOverflow, workers = Workers} = State,
case length(Workers) == 0 of
case len_workers(Workers) == 0 of
true when MaxOverflow < 1 -> full;
true -> overflow;
false -> ready
Expand All @@ -370,3 +384,49 @@ state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) ->
full;
state_name(_State) ->
overflow.

%% workers is list when strategy equal lifo, otherwise workers is queue.

%% out a worker from workers (list or queue)
%% return [] when empty, otherwise return {Pid, WorkersRemain}
out_worker([Pid | Left]) ->
{Pid, Left};
out_worker([]) ->
[];
out_worker(WorkerQueue) ->
case queue:out(WorkerQueue) of
{{value, Item}, RemainQueue} ->
{Item, RemainQueue};
{empty, _} ->
[]
end.

%% put a worker in workers (list or queue)
in_worker(Pid, []) ->
[Pid];
in_worker(Pid, PidList = [_|_]) ->
[Pid | PidList];
in_worker(Pid, PidQueue) ->
queue:in(Pid, PidQueue).

%% check is member of workers (list or queue)
member_workers(_Item, []) ->
false;
member_workers(Item, Workers = [_|_]) ->
lists:member(Item, Workers);
member_workers(Item, Workers) ->
queue:member(Item, Workers).

filter_workers(_Fun, []) ->
ok;
filter_workers(Fun, Workers = [_|_]) ->
lists:filter(Fun, Workers);
filter_workers(Fun, Workers) ->
queue:filter(Fun, Workers).

len_workers([]) -> 0;
len_workers(Workers = [_|_]) ->
length(Workers);
len_workers(Workers) ->
queue:len(Workers).