diff --git a/src/ered_client.erl b/src/ered_client.erl index cfdce31..0b1547b 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -46,7 +46,8 @@ queue_ok_level = 2000 :: non_neg_integer(), max_waiting = 5000 :: non_neg_integer(), - max_pending = 128 :: non_neg_integer() + max_pending = 128 :: non_neg_integer(), + batch_size = 16 :: non_neg_integer() }). -record(st, @@ -64,6 +65,10 @@ queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level node_status = up :: up | node_down | node_deactivated, + transport_socket = none :: gen_tcp:socket() | ssl:sslsocket(), + recv_pid = none :: pid(), + + node_down_timer = none :: none | reference(), opts = #opts{} @@ -115,6 +120,8 @@ %% If the queue has been full then it is considered ok %% again when it reaches this level {queue_ok_level, non_neg_integer()} | + %% Automatic batching when queue is full, turn off batching by setting size to 1. + {batch_size, non_neg_integer()} | %% How long to wait to reconnect after a failed connect attempt {reconnect_wait, non_neg_integer()} | %% Pid to send status messages to @@ -230,6 +237,7 @@ init({Host, Port, OptsList, User}) -> fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val}; ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; ({max_pending, Val}, S) -> S#opts{max_pending = Val}; + ({batch_size, Val}, S) -> S#opts{batch_size = Val}; ({queue_ok_level, Val}, S) -> S#opts{queue_ok_level = Val}; ({reconnect_wait, Val}, S) -> S#opts{reconnect_wait = Val}; ({info_pid, Val}, S) -> S#opts{info_pid = Val}; @@ -276,8 +284,7 @@ handle_cast(reactivate, #st{connection_pid = none} = State) -> handle_cast(reactivate, State) -> {noreply, State#st{node_status = up}}. - -handle_info({{command_reply, Pid}, Reply}, State = #st{pending = Pending, connection_pid = Pid}) -> +handle_info({{command_reply, _}, Reply}, State = #st{pending = Pending, connection_pid = _Pid}) -> case q_out(Pending) of empty -> {noreply, State}; @@ -299,15 +306,15 @@ handle_info(Reason = {init_error, _Errors}, State) -> handle_info(Reason = {socket_closed, _CloseReason}, State) -> {noreply, connection_down(Reason, State)}; -handle_info({connected, Pid, ClusterId}, State) -> +handle_info({connected, ConnectionPid, Socket, RecvPid, ClusterId}, State) -> State1 = cancel_node_down_timer(State), - State2 = State1#st{connection_pid = Pid, cluster_id = ClusterId, + State2 = State1#st{connection_pid = ConnectionPid, cluster_id = ClusterId, node_status = case State1#st.node_status of node_down -> up; OldStatus -> OldStatus end}, State3 = report_connection_status(connection_up, State2), - {noreply, process_commands(State3)}; + {noreply, process_commands(State3#st{transport_socket = Socket, recv_pid = RecvPid})}; handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.node_down_timer -> State1 = report_connection_status({connection_down, node_down_timeout}, State), @@ -358,7 +365,8 @@ cancel_node_down_timer(#st{node_down_timer = TimerRef} = State) -> connection_down(Reason, State) -> State1 = State#st{waiting = q_join(State#st.pending, State#st.waiting), pending = q_new(), - connection_pid = none}, + connection_pid = none, + transport_socket = none}, State2 = process_commands(State1), State3 = report_connection_status({connection_down, Reason}, State2), start_node_down_timer(State3). @@ -370,10 +378,10 @@ process_commands(State) -> NumPending = q_len(State#st.pending), if (NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) -> - {Command, NewWaiting} = q_out(State#st.waiting), - Data = get_command_payload(Command), - ered_connection:command_async(State#st.connection_pid, Data, {command_reply, State#st.connection_pid}), - process_commands(State#st{pending = q_in(Command, State#st.pending), + {Commands, NewWaiting} = q_multi_out(State#st.opts#opts.batch_size, State#st.waiting), + Data = [get_command_payload(X) || X <- Commands], + batch_send(State#st.recv_pid, State#st.transport_socket, Data, {command_reply, State#st.connection_pid}), + process_commands(State#st{pending = q_in_multiple(Commands, State#st.pending), waiting = NewWaiting}); (NumWaiting > State#st.opts#opts.max_waiting) and (State#st.queue_full_event_sent) -> @@ -405,7 +413,11 @@ q_new() -> {0, queue:new()}. q_in(Item, {Size, Q}) -> - {Size+1, queue:in(Item, Q)}. + {Size + 1, queue:in(Item, Q)}. +q_in_multiple([], Q) -> + Q; +q_in_multiple([Item | Items] , {Size, Q}) -> + q_in_multiple(Items, {Size + 1, queue:in(Item, Q)}). q_join({Size1, Q1}, {Size2, Q2}) -> {Size1 + Size2, queue:join(Q1, Q2)}. @@ -413,7 +425,18 @@ q_join({Size1, Q1}, {Size2, Q2}) -> q_out({Size, Q}) -> case queue:out(Q) of {empty, _Q} -> empty; - {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} + {{value, Val}, NewQ} -> {Val, {Size - 1, NewQ}} + end. +q_multi_out(Nu, Queue) -> + q_multi_out(Nu, Queue, []). +q_multi_out(0, Q, Acc) -> + {lists:reverse(Acc), Q}; +q_multi_out(Nu, {Size, Q}, Acc) -> + case queue:out(Q) of + {empty, _Q} -> + {lists:reverse(Acc), {Size,Q}}; + {{value, Val}, NewQ} -> + q_multi_out(Nu - 1, {Size - 1, NewQ}, [Val | Acc]) end. q_to_list({_Size, Q}) -> @@ -422,7 +445,6 @@ q_to_list({_Size, Q}) -> q_len({Size, _Q}) -> Size. - reply_command({command, _, Fun}, Reply) -> Fun(Reply). @@ -478,28 +500,25 @@ send_info(_Msg, _State) -> connect(Pid, Opts) -> Result = ered_connection:connect(Opts#opts.host, Opts#opts.port, Opts#opts.connection_opts), case Result of - {error, Reason} -> - Pid ! {connect_error, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - - {ok, ConnectionPid} -> - case init(Pid, ConnectionPid, Opts) of - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason}, - timer:sleep(Opts#opts.reconnect_wait); + {ok, ConnectionPid, RecvPid, Socket} -> + case init(Pid, RecvPid, Socket, Opts) of {ok, ClusterId} -> - Pid ! {connected, ConnectionPid, ClusterId}, + Pid ! {connected, ConnectionPid, Socket, RecvPid, ClusterId}, receive {socket_closed, ConnectionPid, Reason} -> Pid ! {socket_closed, Reason} - end - end - + end; + {socket_closed, ConnectionPid, Reason} -> + Pid ! {socket_closed, Reason}, + timer:sleep(Opts#opts.reconnect_wait) + end; + {error, Reason} -> + Pid ! {connect_error, Reason}, + timer:sleep(Opts#opts.reconnect_wait) end, connect(Pid, Opts). - -init(MainPid, ConnectionPid, Opts) -> +init(MainPid, RecvPid, Socket, Opts) -> Cmd1 = [[<<"CLUSTER">>, <<"MYID">>] || Opts#opts.use_cluster_id], Cmd2 = case {Opts#opts.resp_version, Opts#opts.auth} of {3, {Username, Password}} -> @@ -517,7 +536,7 @@ init(MainPid, ConnectionPid, Opts) -> [] -> {ok, undefined}; Commands -> - ered_connection:command_async(ConnectionPid, Commands, init_command_reply), + send(RecvPid, Socket, Commands, init_command_reply), receive {init_command_reply, Reply} -> case [Reason || {error, Reason} <- Reply] of @@ -528,9 +547,54 @@ init(MainPid, ConnectionPid, Opts) -> Errors -> MainPid ! {init_error, Errors}, timer:sleep(Opts#opts.reconnect_wait), - init(MainPid, ConnectionPid, Opts) + init(MainPid, RecvPid, Socket, Opts) end; Other -> Other end end. + +send(RecvPid, {Socket, Transport}, Commands, Ref) -> + Time = erlang:monotonic_time(millisecond), + Commands2 = ered_command:convert_to(Commands), + Data = ered_command:get_data(Commands2), + Class = ered_command:get_response_class(Commands2), + RefInfo = {Class, self(), Ref, []}, + case Transport:send(Socket, Data) of + ok -> + RecvPid ! {requests, [RefInfo], Time}; + {error, Reason} -> + %% Give recv_loop time to finish processing + %% This will shut down recv_loop if it is waiting on socket + Transport:shutdown(Socket, read_write), + %% This will shut down recv_loop if it is waiting for a reference + RecvPid ! close_down, + %% Ok, recv done, time to die + receive {recv_exit, _Reason} -> ok end, + self() ! {socket_closed, Reason} + end. + +batch_send(RecvPid, {Socket, Transport}, Commands, Ref) -> + To_Data = fun(Command) -> + Command2 = ered_command:convert_to(Command), + Data = ered_command:get_data(Command2), + Class = ered_command:get_response_class(Command2), + RefInfo = {Class, self(), Ref, []}, + {Data, RefInfo} + end, + {Data, Refs} = lists:unzip([To_Data(Command) || Command <- Commands]), + Time = erlang:monotonic_time(millisecond), + case Transport:send(Socket, Data) of + ok -> + RecvPid ! {requests, Refs, Time}; + {error, Reason} -> + %% Give recv_loop time to finish processing + %% This will shut down recv_loop if it is waiting on socket + Transport:shutdown(Socket, read_write), + %% This will shut down recv_loop if it is waiting for a reference + RecvPid ! close_down, + %% Ok, recv done, time to die + receive {recv_exit, _Reason} -> ok end, + self() ! {socket_closed, Reason} + end. + diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index da55a9e..fa42fd0 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -190,8 +190,7 @@ command(ClusterRef, Command, Key) -> command(ClusterRef, Command, Key, infinity). command(ClusterRef, Command, Key, Timeout) when is_binary(Key) -> - C = ered_command:convert_to(Command), - gen_server:call(ClusterRef, {command, C, Key}, Timeout). + gen_server:call(ClusterRef, {command, Command, Key}, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. @@ -201,8 +200,7 @@ command(ClusterRef, Command, Key, Timeout) when is_binary(Key) -> %% runs in an unspecified process. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) -> - C = ered_command:convert_to(Command), - gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}). + gen_server:cast(ServerRef, {command_async, Command, Key, ReplyFun}). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_all(cluster_ref(), command()) -> [reply()]. @@ -663,7 +661,7 @@ create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> update_slots(Pid, SlotMapVersion, Client), gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1}); {ask, Addr} -> - gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply}); + gen_server:cast(Pid, {forward_command_asking, ered_command:convert_to(Command), Slot, From, Addr, AttemptsLeft-1, Reply}); try_again -> erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1}); cluster_down -> diff --git a/src/ered_connection.erl b/src/ered_connection.erl index 7efb9a9..b40a6ac 100644 --- a/src/ered_connection.erl +++ b/src/ered_connection.erl @@ -31,9 +31,6 @@ }). -type opt() :: - %% If commands are queued up in the process message queue this is the max - %% amount of messages that will be received and sent in one call - {batch_size, non_neg_integer()} | %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. {connect_timeout, timeout()} | %% Options passed to gen_tcp:connect/4. @@ -83,8 +80,8 @@ connect(Host, Port) -> connect(Host, Port, Opts) -> Pid = connect_async(Host, Port, Opts), receive - {connected, Pid} -> - {ok, Pid}; + {connected, Pid, RecvPid, Socket} -> + {ok, Pid, RecvPid, Socket}; {connect_error, Pid, Reason} -> {error, Reason} end. @@ -104,9 +101,8 @@ connect(Host, Port, Opts) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - connect_async(Addr, Port, Opts) -> [error({badarg, BadOpt}) - || BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout, + || BadOpt <- proplists:get_keys(Opts) -- [tcp_options, tls_options, push_cb, response_timeout, tcp_connect_timeout, tls_connect_timeout, connect_timeout]], - BatchSize = proplists:get_value(batch_size, Opts, 16), ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), TcpOptions = proplists:get_value(tcp_options, Opts, []), @@ -126,18 +122,21 @@ connect_async(Addr, Port, Opts) -> SendPid = self(), case catch Transport:connect(Addr, Port, [{active, false}, binary] ++ Options, Timeout) of {ok, Socket} -> - Master ! {connected, SendPid}, Pid = spawn_link(fun() -> ExitReason = recv_loop(Transport, Socket, PushCb, ResponseTimeout), %% Inform sending process about exit SendPid ! ExitReason end), - ExitReason = send_loop(Transport, Socket, Pid, BatchSize), - Master ! {socket_closed, SendPid, ExitReason}; + Master ! {connected, SendPid, Pid, {Socket, Transport}}; {error, Reason} -> Master ! {connect_error, SendPid, Reason}; Other -> % {'EXIT',_} Master ! {connect_error, SendPid, Other} + end, + %% Handle the exit of the receive_pid + receive + {recv_exit, R} -> + Master ! {socket_closed, SendPid, {recv_exit, R}} end end). @@ -333,52 +332,3 @@ update_waiting(Timeout, State) when State#recv_st.waiting == [] -> end; update_waiting(_Timeout, State) -> State. - -%% -%% Send logic -%% - -send_loop(Transport, Socket, RecvPid, BatchSize) -> - case receive_data(BatchSize) of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {data, {Refs, Data}} -> - Time = erlang:monotonic_time(millisecond), - case Transport:send(Socket, Data) of - ok -> - %% send to recv proc to fetch the response - RecvPid ! {requests, Refs, Time}, - send_loop(Transport, Socket, RecvPid, BatchSize); - {error, Reason} -> - %% Give recv_loop time to finish processing - %% This will shut down recv_loop if it is waiting on socket - Transport:shutdown(Socket, read_write), - %% This will shut down recv_loop if it is waiting for a reference - RecvPid ! close_down, - %% Ok, recv done, time to die - receive {recv_exit, _Reason} -> ok end, - {send_exit, Reason} - end - end. - -receive_data(N) -> - receive_data(N, infinity, []). - -receive_data(0, _Time, Acc) -> - {data, lists:unzip(lists:reverse(Acc))}; -receive_data(N, Time, Acc) -> - receive - Msg -> - case Msg of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {send, Pid, Ref, Commands} -> - Data = ered_command:get_data(Commands), - Class = ered_command:get_response_class(Commands), - RefInfo = {Class, Pid, Ref, []}, - Acc1 = [{RefInfo, Data} | Acc], - receive_data(N - 1, 0, Acc1) - end - after Time -> - receive_data(0, 0, Acc) - end. diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 516c0cd..67b59a4 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -4,6 +4,7 @@ run_test_() -> [ + {spawn, fun split_data_t/0}, {spawn, fun request_t/0}, {spawn, fun fail_connect_t/0}, {spawn, fun fail_parse_t/0}, @@ -24,6 +25,15 @@ run_test_() -> {spawn, fun bad_host_t/0} ]. +split_data_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, Port} = inet:port(ListenSock), + Client = start_client(Port), + Data = iolist_to_binary([<<"A">> || _ <- lists:seq(0,3000)]), + ered_client:command(Client, [<<"hello">>, <<"3">>]), + <<"OK">> = ered_client:command(Client, [<<"set">>, <<"key1">>, Data]), + Data = ered_client:command(Client, [<<"get">>, <<"key1">>]). + request_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl index c28da6e..427b2ed 100644 --- a/test/ered_connection_tests.erl +++ b/test/ered_connection_tests.erl @@ -4,7 +4,7 @@ split_data_test() -> Data = iolist_to_binary([<<"A">> || _ <- lists:seq(0,3000)]), - {ok, Conn1} = ered_connection:connect("127.0.0.1", 6379), + {ok, Conn1,_,_} = ered_connection:connect("127.0.0.1", 6379), ered_connection:command(Conn1, [<<"hello">>, <<"3">>]), <<"OK">> = ered_connection:command(Conn1, [<<"set">>, <<"key1">>, Data]), Data = ered_connection:command(Conn1, [<<"get">>, <<"key1">>]). @@ -30,8 +30,7 @@ trailing_reply_test() -> end), {port, Port} = receive_msg(), %% increase receive buffer to fit the whole nasty data package - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port, [{batch_size, 1}, - {tcp_options, [{recbuf, 524288}]}]), + {ok, Conn1,_,_} = ered_connection:connect("127.0.0.1", Port, [{tcp_options, [{recbuf, 524288}]}]), ?debugFmt("~w", [Conn1]), ered_connection:command_async(Conn1, [<<"ping">>], ping1), receive sent_big_nasty -> ok end,