From 4148f546bc9851e52effbfdf75a6190566434533 Mon Sep 17 00:00:00 2001 From: William Voong Date: Tue, 21 Oct 2025 14:33:41 +0200 Subject: [PATCH 1/2] Removal of send loop process The send loop process relies on incoming message in its message queue to batch properly. However, there is a race condition, the send loop process might send its data before handling the incoming message arriving in its message queue, causing unnecessary overhead on the client side and making batching mechanism unreliable in ered. This commit handles the removal of the send loop and moving gen_tcp:send to ered_client.erl. Change-Id: Ic3aedb38e3caa86d3fa6862a4c976963393cec2f --- src/ered_client.erl | 71 ++++++++++++++++++++++------------ src/ered_cluster.erl | 8 ++-- src/ered_connection.erl | 64 +++++------------------------- test/ered_client_tests.erl | 10 +++++ test/ered_connection_tests.erl | 63 ------------------------------ 5 files changed, 69 insertions(+), 147 deletions(-) delete mode 100644 test/ered_connection_tests.erl diff --git a/src/ered_client.erl b/src/ered_client.erl index cfdce31..706cbae 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -64,6 +64,10 @@ queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level node_status = up :: up | node_down | node_deactivated, + transport_socket = none :: gen_tcp:socket() | ssl:sslsocket(), + recv_pid = none :: pid(), + + node_down_timer = none :: none | reference(), opts = #opts{} @@ -276,8 +280,7 @@ handle_cast(reactivate, #st{connection_pid = none} = State) -> handle_cast(reactivate, State) -> {noreply, State#st{node_status = up}}. - -handle_info({{command_reply, Pid}, Reply}, State = #st{pending = Pending, connection_pid = Pid}) -> +handle_info({{command_reply, _}, Reply}, State = #st{pending = Pending, connection_pid = _Pid}) -> case q_out(Pending) of empty -> {noreply, State}; @@ -299,15 +302,15 @@ handle_info(Reason = {init_error, _Errors}, State) -> handle_info(Reason = {socket_closed, _CloseReason}, State) -> {noreply, connection_down(Reason, State)}; -handle_info({connected, Pid, ClusterId}, State) -> +handle_info({connected, ConnectionPid, Socket, RecvPid, ClusterId}, State) -> State1 = cancel_node_down_timer(State), - State2 = State1#st{connection_pid = Pid, cluster_id = ClusterId, + State2 = State1#st{connection_pid = ConnectionPid, cluster_id = ClusterId, node_status = case State1#st.node_status of node_down -> up; OldStatus -> OldStatus end}, State3 = report_connection_status(connection_up, State2), - {noreply, process_commands(State3)}; + {noreply, process_commands(State3#st{transport_socket = Socket, recv_pid = RecvPid})}; handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.node_down_timer -> State1 = report_connection_status({connection_down, node_down_timeout}, State), @@ -358,7 +361,8 @@ cancel_node_down_timer(#st{node_down_timer = TimerRef} = State) -> connection_down(Reason, State) -> State1 = State#st{waiting = q_join(State#st.pending, State#st.waiting), pending = q_new(), - connection_pid = none}, + connection_pid = none, + transport_socket = none}, State2 = process_commands(State1), State3 = report_connection_status({connection_down, Reason}, State2), start_node_down_timer(State3). @@ -372,7 +376,7 @@ process_commands(State) -> (NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) -> {Command, NewWaiting} = q_out(State#st.waiting), Data = get_command_payload(Command), - ered_connection:command_async(State#st.connection_pid, Data, {command_reply, State#st.connection_pid}), + send(State#st.recv_pid, State#st.transport_socket, Data, {command_reply, State#st.connection_pid}), process_commands(State#st{pending = q_in(Command, State#st.pending), waiting = NewWaiting}); @@ -478,28 +482,25 @@ send_info(_Msg, _State) -> connect(Pid, Opts) -> Result = ered_connection:connect(Opts#opts.host, Opts#opts.port, Opts#opts.connection_opts), case Result of - {error, Reason} -> - Pid ! {connect_error, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - - {ok, ConnectionPid} -> - case init(Pid, ConnectionPid, Opts) of - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason}, - timer:sleep(Opts#opts.reconnect_wait); + {ok, ConnectionPid, RecvPid, Socket} -> + case init(Pid, RecvPid, Socket, Opts) of {ok, ClusterId} -> - Pid ! {connected, ConnectionPid, ClusterId}, + Pid ! {connected, ConnectionPid, Socket, RecvPid, ClusterId}, receive {socket_closed, ConnectionPid, Reason} -> Pid ! {socket_closed, Reason} - end - end - + end; + {socket_closed, ConnectionPid, Reason} -> + Pid ! {socket_closed, Reason}, + timer:sleep(Opts#opts.reconnect_wait) + end; + {error, Reason} -> + Pid ! {connect_error, Reason}, + timer:sleep(Opts#opts.reconnect_wait) end, connect(Pid, Opts). - -init(MainPid, ConnectionPid, Opts) -> +init(MainPid, RecvPid, Socket, Opts) -> Cmd1 = [[<<"CLUSTER">>, <<"MYID">>] || Opts#opts.use_cluster_id], Cmd2 = case {Opts#opts.resp_version, Opts#opts.auth} of {3, {Username, Password}} -> @@ -517,7 +518,7 @@ init(MainPid, ConnectionPid, Opts) -> [] -> {ok, undefined}; Commands -> - ered_connection:command_async(ConnectionPid, Commands, init_command_reply), + send(RecvPid, Socket, Commands, init_command_reply), receive {init_command_reply, Reply} -> case [Reason || {error, Reason} <- Reply] of @@ -528,9 +529,31 @@ init(MainPid, ConnectionPid, Opts) -> Errors -> MainPid ! {init_error, Errors}, timer:sleep(Opts#opts.reconnect_wait), - init(MainPid, ConnectionPid, Opts) + init(MainPid, RecvPid, Socket, Opts) end; Other -> Other end end. + +send(RecvPid, {Socket, Transport}, Commands, Ref) -> + Time = erlang:monotonic_time(millisecond), + Commands2 = ered_command:convert_to(Commands), + Data = ered_command:get_data(Commands2), + Class = ered_command:get_response_class(Commands2), + RefInfo = {Class, self(), Ref, []}, + case Transport:send(Socket, Data) of + ok -> + RecvPid ! {requests, [RefInfo], Time}; + {error, Reason} -> + %% Give recv_loop time to finish processing + %% This will shut down recv_loop if it is waiting on socket + Transport:shutdown(Socket, read_write), + %% This will shut down recv_loop if it is waiting for a reference + RecvPid ! close_down, + %% Ok, recv done, time to die + receive {recv_exit, _Reason} -> ok end, + self() ! {socket_closed, Reason} + end. + + diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index da55a9e..fa42fd0 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -190,8 +190,7 @@ command(ClusterRef, Command, Key) -> command(ClusterRef, Command, Key, infinity). command(ClusterRef, Command, Key, Timeout) when is_binary(Key) -> - C = ered_command:convert_to(Command), - gen_server:call(ClusterRef, {command, C, Key}, Timeout). + gen_server:call(ClusterRef, {command, Command, Key}, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(cluster_ref(), command(), key(), fun((reply()) -> any())) -> ok. @@ -201,8 +200,7 @@ command(ClusterRef, Command, Key, Timeout) when is_binary(Key) -> %% runs in an unspecified process. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) -> - C = ered_command:convert_to(Command), - gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}). + gen_server:cast(ServerRef, {command_async, Command, Key, ReplyFun}). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_all(cluster_ref(), command()) -> [reply()]. @@ -663,7 +661,7 @@ create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> update_slots(Pid, SlotMapVersion, Client), gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1}); {ask, Addr} -> - gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply}); + gen_server:cast(Pid, {forward_command_asking, ered_command:convert_to(Command), Slot, From, Addr, AttemptsLeft-1, Reply}); try_again -> erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1}); cluster_down -> diff --git a/src/ered_connection.erl b/src/ered_connection.erl index 7efb9a9..3905d9c 100644 --- a/src/ered_connection.erl +++ b/src/ered_connection.erl @@ -83,8 +83,8 @@ connect(Host, Port) -> connect(Host, Port, Opts) -> Pid = connect_async(Host, Port, Opts), receive - {connected, Pid} -> - {ok, Pid}; + {connected, Pid, RecvPid, Socket} -> + {ok, Pid, RecvPid, Socket}; {connect_error, Pid, Reason} -> {error, Reason} end. @@ -106,7 +106,7 @@ connect_async(Addr, Port, Opts) -> [error({badarg, BadOpt}) || BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout, tcp_connect_timeout, tls_connect_timeout, connect_timeout]], - BatchSize = proplists:get_value(batch_size, Opts, 16), + _BatchSize = proplists:get_value(batch_size, Opts, 16), ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), TcpOptions = proplists:get_value(tcp_options, Opts, []), @@ -126,18 +126,21 @@ connect_async(Addr, Port, Opts) -> SendPid = self(), case catch Transport:connect(Addr, Port, [{active, false}, binary] ++ Options, Timeout) of {ok, Socket} -> - Master ! {connected, SendPid}, Pid = spawn_link(fun() -> ExitReason = recv_loop(Transport, Socket, PushCb, ResponseTimeout), %% Inform sending process about exit SendPid ! ExitReason end), - ExitReason = send_loop(Transport, Socket, Pid, BatchSize), - Master ! {socket_closed, SendPid, ExitReason}; + Master ! {connected, SendPid, Pid, {Socket, Transport}}; {error, Reason} -> Master ! {connect_error, SendPid, Reason}; Other -> % {'EXIT',_} Master ! {connect_error, SendPid, Other} + end, + %% Handle the exit of the receive_pid + receive + {recv_exit, R} -> + Master ! {socket_closed, SendPid, {recv_exit, R}} end end). @@ -333,52 +336,3 @@ update_waiting(Timeout, State) when State#recv_st.waiting == [] -> end; update_waiting(_Timeout, State) -> State. - -%% -%% Send logic -%% - -send_loop(Transport, Socket, RecvPid, BatchSize) -> - case receive_data(BatchSize) of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {data, {Refs, Data}} -> - Time = erlang:monotonic_time(millisecond), - case Transport:send(Socket, Data) of - ok -> - %% send to recv proc to fetch the response - RecvPid ! {requests, Refs, Time}, - send_loop(Transport, Socket, RecvPid, BatchSize); - {error, Reason} -> - %% Give recv_loop time to finish processing - %% This will shut down recv_loop if it is waiting on socket - Transport:shutdown(Socket, read_write), - %% This will shut down recv_loop if it is waiting for a reference - RecvPid ! close_down, - %% Ok, recv done, time to die - receive {recv_exit, _Reason} -> ok end, - {send_exit, Reason} - end - end. - -receive_data(N) -> - receive_data(N, infinity, []). - -receive_data(0, _Time, Acc) -> - {data, lists:unzip(lists:reverse(Acc))}; -receive_data(N, Time, Acc) -> - receive - Msg -> - case Msg of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {send, Pid, Ref, Commands} -> - Data = ered_command:get_data(Commands), - Class = ered_command:get_response_class(Commands), - RefInfo = {Class, Pid, Ref, []}, - Acc1 = [{RefInfo, Data} | Acc], - receive_data(N - 1, 0, Acc1) - end - after Time -> - receive_data(0, 0, Acc) - end. diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 516c0cd..67b59a4 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -4,6 +4,7 @@ run_test_() -> [ + {spawn, fun split_data_t/0}, {spawn, fun request_t/0}, {spawn, fun fail_connect_t/0}, {spawn, fun fail_parse_t/0}, @@ -24,6 +25,15 @@ run_test_() -> {spawn, fun bad_host_t/0} ]. +split_data_t() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, Port} = inet:port(ListenSock), + Client = start_client(Port), + Data = iolist_to_binary([<<"A">> || _ <- lists:seq(0,3000)]), + ered_client:command(Client, [<<"hello">>, <<"3">>]), + <<"OK">> = ered_client:command(Client, [<<"set">>, <<"key1">>, Data]), + Data = ered_client:command(Client, [<<"get">>, <<"key1">>]). + request_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl deleted file mode 100644 index c28da6e..0000000 --- a/test/ered_connection_tests.erl +++ /dev/null @@ -1,63 +0,0 @@ --module(ered_connection_tests). - --include_lib("eunit/include/eunit.hrl"). - -split_data_test() -> - Data = iolist_to_binary([<<"A">> || _ <- lists:seq(0,3000)]), - {ok, Conn1} = ered_connection:connect("127.0.0.1", 6379), - ered_connection:command(Conn1, [<<"hello">>, <<"3">>]), - <<"OK">> = ered_connection:command(Conn1, [<<"set">>, <<"key1">>, Data]), - Data = ered_connection:command(Conn1, [<<"get">>, <<"key1">>]). - -%% Suppress warnings due to expected failures from MalformedCommand. --dialyzer({[no_fail_call, no_return], trailing_reply_test/0}). -trailing_reply_test() -> - Pid = self(), - %% 277124 byte nested array, it takes a non-trivial time to parse - BigNastyData = iolist_to_binary(nested_list(8)), - ?debugFmt("~w", [size(BigNastyData)]), - - spawn_link(fun() -> - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - Pid ! {port, Port}, - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, BigNastyData), - ok = gen_tcp:shutdown(Sock, write), - Pid ! sent_big_nasty, - receive ok -> ok end - end), - {port, Port} = receive_msg(), - %% increase receive buffer to fit the whole nasty data package - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port, [{batch_size, 1}, - {tcp_options, [{recbuf, 524288}]}]), - ?debugFmt("~w", [Conn1]), - ered_connection:command_async(Conn1, [<<"ping">>], ping1), - receive sent_big_nasty -> ok end, - MalformedCommand = {redis_command, pipeline, [undefined]}, - ered_connection:command_async(Conn1, MalformedCommand, no_ref), - - %% make sure the ping is received before the connection is shut down - - ?debugMsg("waiting for ping"), - - receive {ping1, _} -> ok after 2000 -> exit(waiting_for_ping) end, - ?debugMsg("got ping"), - {socket_closed, Conn1, {send_exit, einval}} = receive Msg -> Msg end, - ensure_empty(). - - -receive_msg() -> - receive Msg -> Msg end. - -%% This function is used from trailing_reply_test() --dialyzer({no_unused, ensure_empty/0}). -ensure_empty() -> - empty = receive Msg -> Msg after 0 -> empty end. - - -nested_list(1) -> - <<"+A\r\n">>; -nested_list(N) -> - ["*", integer_to_list(N), "\r\n", [nested_list(N-1) || _ <- lists:seq(1, N)]]. From 687bb3158f87cd9ef458833894a538802b101b28 Mon Sep 17 00:00:00 2001 From: William Voong Date: Thu, 30 Oct 2025 16:17:55 +0100 Subject: [PATCH 2/2] Batch waiting commands. Implementation of batching in ered_client.erl. The strategy for batching follows this pattern: Before hitting ered_client.erl max_pending, we will not batch any commands. After hitting max_pending no new commands will be sent to Valkey/Redis server, and will queue up commands in its waiting queue, and once we are below max pending again, commands will be batched and be sent to redis. Change-Id: I2fe7e29be829106d9355b105cc190cac0ba0850c --- src/ered_client.erl | 57 ++++++++++++++++++++++++++----- src/ered_connection.erl | 6 +--- test/ered_connection_tests.erl | 62 ++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 13 deletions(-) create mode 100644 test/ered_connection_tests.erl diff --git a/src/ered_client.erl b/src/ered_client.erl index 706cbae..0b1547b 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -46,7 +46,8 @@ queue_ok_level = 2000 :: non_neg_integer(), max_waiting = 5000 :: non_neg_integer(), - max_pending = 128 :: non_neg_integer() + max_pending = 128 :: non_neg_integer(), + batch_size = 16 :: non_neg_integer() }). -record(st, @@ -119,6 +120,8 @@ %% If the queue has been full then it is considered ok %% again when it reaches this level {queue_ok_level, non_neg_integer()} | + %% Automatic batching when queue is full, turn off batching by setting size to 1. + {batch_size, non_neg_integer()} | %% How long to wait to reconnect after a failed connect attempt {reconnect_wait, non_neg_integer()} | %% Pid to send status messages to @@ -234,6 +237,7 @@ init({Host, Port, OptsList, User}) -> fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val}; ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; ({max_pending, Val}, S) -> S#opts{max_pending = Val}; + ({batch_size, Val}, S) -> S#opts{batch_size = Val}; ({queue_ok_level, Val}, S) -> S#opts{queue_ok_level = Val}; ({reconnect_wait, Val}, S) -> S#opts{reconnect_wait = Val}; ({info_pid, Val}, S) -> S#opts{info_pid = Val}; @@ -374,10 +378,10 @@ process_commands(State) -> NumPending = q_len(State#st.pending), if (NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) -> - {Command, NewWaiting} = q_out(State#st.waiting), - Data = get_command_payload(Command), - send(State#st.recv_pid, State#st.transport_socket, Data, {command_reply, State#st.connection_pid}), - process_commands(State#st{pending = q_in(Command, State#st.pending), + {Commands, NewWaiting} = q_multi_out(State#st.opts#opts.batch_size, State#st.waiting), + Data = [get_command_payload(X) || X <- Commands], + batch_send(State#st.recv_pid, State#st.transport_socket, Data, {command_reply, State#st.connection_pid}), + process_commands(State#st{pending = q_in_multiple(Commands, State#st.pending), waiting = NewWaiting}); (NumWaiting > State#st.opts#opts.max_waiting) and (State#st.queue_full_event_sent) -> @@ -409,7 +413,11 @@ q_new() -> {0, queue:new()}. q_in(Item, {Size, Q}) -> - {Size+1, queue:in(Item, Q)}. + {Size + 1, queue:in(Item, Q)}. +q_in_multiple([], Q) -> + Q; +q_in_multiple([Item | Items] , {Size, Q}) -> + q_in_multiple(Items, {Size + 1, queue:in(Item, Q)}). q_join({Size1, Q1}, {Size2, Q2}) -> {Size1 + Size2, queue:join(Q1, Q2)}. @@ -417,7 +425,18 @@ q_join({Size1, Q1}, {Size2, Q2}) -> q_out({Size, Q}) -> case queue:out(Q) of {empty, _Q} -> empty; - {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} + {{value, Val}, NewQ} -> {Val, {Size - 1, NewQ}} + end. +q_multi_out(Nu, Queue) -> + q_multi_out(Nu, Queue, []). +q_multi_out(0, Q, Acc) -> + {lists:reverse(Acc), Q}; +q_multi_out(Nu, {Size, Q}, Acc) -> + case queue:out(Q) of + {empty, _Q} -> + {lists:reverse(Acc), {Size,Q}}; + {{value, Val}, NewQ} -> + q_multi_out(Nu - 1, {Size - 1, NewQ}, [Val | Acc]) end. q_to_list({_Size, Q}) -> @@ -426,7 +445,6 @@ q_to_list({_Size, Q}) -> q_len({Size, _Q}) -> Size. - reply_command({command, _, Fun}, Reply) -> Fun(Reply). @@ -556,4 +574,27 @@ send(RecvPid, {Socket, Transport}, Commands, Ref) -> self() ! {socket_closed, Reason} end. +batch_send(RecvPid, {Socket, Transport}, Commands, Ref) -> + To_Data = fun(Command) -> + Command2 = ered_command:convert_to(Command), + Data = ered_command:get_data(Command2), + Class = ered_command:get_response_class(Command2), + RefInfo = {Class, self(), Ref, []}, + {Data, RefInfo} + end, + {Data, Refs} = lists:unzip([To_Data(Command) || Command <- Commands]), + Time = erlang:monotonic_time(millisecond), + case Transport:send(Socket, Data) of + ok -> + RecvPid ! {requests, Refs, Time}; + {error, Reason} -> + %% Give recv_loop time to finish processing + %% This will shut down recv_loop if it is waiting on socket + Transport:shutdown(Socket, read_write), + %% This will shut down recv_loop if it is waiting for a reference + RecvPid ! close_down, + %% Ok, recv done, time to die + receive {recv_exit, _Reason} -> ok end, + self() ! {socket_closed, Reason} + end. diff --git a/src/ered_connection.erl b/src/ered_connection.erl index 3905d9c..b40a6ac 100644 --- a/src/ered_connection.erl +++ b/src/ered_connection.erl @@ -31,9 +31,6 @@ }). -type opt() :: - %% If commands are queued up in the process message queue this is the max - %% amount of messages that will be received and sent in one call - {batch_size, non_neg_integer()} | %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. {connect_timeout, timeout()} | %% Options passed to gen_tcp:connect/4. @@ -104,9 +101,8 @@ connect(Host, Port, Opts) -> %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - connect_async(Addr, Port, Opts) -> [error({badarg, BadOpt}) - || BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout, + || BadOpt <- proplists:get_keys(Opts) -- [tcp_options, tls_options, push_cb, response_timeout, tcp_connect_timeout, tls_connect_timeout, connect_timeout]], - _BatchSize = proplists:get_value(batch_size, Opts, 16), ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), TcpOptions = proplists:get_value(tcp_options, Opts, []), diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl new file mode 100644 index 0000000..427b2ed --- /dev/null +++ b/test/ered_connection_tests.erl @@ -0,0 +1,62 @@ +-module(ered_connection_tests). + +-include_lib("eunit/include/eunit.hrl"). + +split_data_test() -> + Data = iolist_to_binary([<<"A">> || _ <- lists:seq(0,3000)]), + {ok, Conn1,_,_} = ered_connection:connect("127.0.0.1", 6379), + ered_connection:command(Conn1, [<<"hello">>, <<"3">>]), + <<"OK">> = ered_connection:command(Conn1, [<<"set">>, <<"key1">>, Data]), + Data = ered_connection:command(Conn1, [<<"get">>, <<"key1">>]). + +%% Suppress warnings due to expected failures from MalformedCommand. +-dialyzer({[no_fail_call, no_return], trailing_reply_test/0}). +trailing_reply_test() -> + Pid = self(), + %% 277124 byte nested array, it takes a non-trivial time to parse + BigNastyData = iolist_to_binary(nested_list(8)), + ?debugFmt("~w", [size(BigNastyData)]), + + spawn_link(fun() -> + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), + {ok, Port} = inet:port(ListenSock), + Pid ! {port, Port}, + {ok, Sock} = gen_tcp:accept(ListenSock), + {ok, <<"*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), + ok = gen_tcp:send(Sock, BigNastyData), + ok = gen_tcp:shutdown(Sock, write), + Pid ! sent_big_nasty, + receive ok -> ok end + end), + {port, Port} = receive_msg(), + %% increase receive buffer to fit the whole nasty data package + {ok, Conn1,_,_} = ered_connection:connect("127.0.0.1", Port, [{tcp_options, [{recbuf, 524288}]}]), + ?debugFmt("~w", [Conn1]), + ered_connection:command_async(Conn1, [<<"ping">>], ping1), + receive sent_big_nasty -> ok end, + MalformedCommand = {redis_command, pipeline, [undefined]}, + ered_connection:command_async(Conn1, MalformedCommand, no_ref), + + %% make sure the ping is received before the connection is shut down + + ?debugMsg("waiting for ping"), + + receive {ping1, _} -> ok after 2000 -> exit(waiting_for_ping) end, + ?debugMsg("got ping"), + {socket_closed, Conn1, {send_exit, einval}} = receive Msg -> Msg end, + ensure_empty(). + + +receive_msg() -> + receive Msg -> Msg end. + +%% This function is used from trailing_reply_test() +-dialyzer({no_unused, ensure_empty/0}). +ensure_empty() -> + empty = receive Msg -> Msg after 0 -> empty end. + + +nested_list(1) -> + <<"+A\r\n">>; +nested_list(N) -> + ["*", integer_to_list(N), "\r\n", [nested_list(N-1) || _ <- lists:seq(1, N)]].