Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 133 additions & 15 deletions src/dev_relay.erl
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ call(M1, RawM2, Opts) ->
?event(debug_relay, {relay_call, {without_http_params, TargetMod4}}),
?event(debug_relay, {relay_call, {with_http_params, TargetMod5}}),
true = hb_message:verify(TargetMod5),

?event(debug_relay, {relay_call, {verified, true}}),
RequestMethod =
hb_maps:get(<<"method">>, TargetMod5, RelayMethod, Opts),
Client =
case hb_maps:get(<<"http-client">>, BaseTarget, not_found, Opts) of
not_found -> hb_opts:get(relay_http_client, Opts);
Expand All @@ -143,19 +144,29 @@ call(M1, RawM2, Opts) ->
% Let `hb_http:request/2' handle finding the peer and dispatching the
% request, unless the peer is explicitly given.
HTTPOpts = Opts#{ http_client => Client, http_only_result => false },
Res = case RelayPeer of
not_found ->
hb_http:request(TargetMod5, HTTPOpts);
_ ->
?event(debug_relay, {relaying_to_peer, RelayPeer}),
hb_http:request(
RelayMethod,
RelayPeer,
RelayPath,
TargetMod5,
HTTPOpts
)
end,
Res =
case RelayPeer of
not_found ->
hb_http:request(TargetMod5, HTTPOpts);
Peer when is_map(Peer) ->
Prepared = prepare_relay_peer(Peer, Opts),
hb_http:request(
RequestMethod,
Prepared,
RelayPath,
TargetMod5,
HTTPOpts
);
Peer ->
?event(debug_relay, {relaying_to_peer, Peer}),
hb_http:request(
RequestMethod,
Peer,
RelayPath,
TargetMod5,
HTTPOpts
)
end,
case Res of
{ok, R} ->
{ok, hb_maps:without([<<"set-cookie">>], R)};
Expand Down Expand Up @@ -186,6 +197,50 @@ request(_Base, Req, Opts) ->
}
}.

prepare_relay_peer(Peer, Opts) ->
case hb_ao:get(<<"nodes">>, Peer, not_found, Opts) of
Nodes when is_list(Nodes) ->
Peer#{ <<"nodes">> => prepare_relay_nodes(Nodes, Opts) };
_ ->
Peer
end.

prepare_relay_nodes(Nodes, Opts) ->
[
prepare_relay_node(Node, Opts)
||
Node <- hb_util:message_to_ordered_list(Nodes, Opts)
].

prepare_relay_node(Node, Opts) ->
NormalizedOpts =
case hb_maps:get(<<"opts">>, Node, #{}, Opts) of
Map when is_map(Map) -> hb_opts:mimic_default_types(Map, new_atoms, Opts);
_ -> #{}
end,
Node#{
<<"opts">> => apply_node_timeout(Node, NormalizedOpts, Opts)
}.

apply_node_timeout(Node, NodeOpts, Opts) ->
Timeout =
case hb_ao:get(<<"http-timeout">>, Node, not_found, Opts) of
not_found ->
hb_maps:get(<<"http-timeout">>, NodeOpts, not_found, Opts);
TimeoutValue ->
TimeoutValue
end,
case Timeout of
not_found ->
NodeOpts;
_ ->
TimeoutMs = hb_util:int(Timeout),
NodeOpts#{
http_request_send_timeout => TimeoutMs,
http_connect_timeout => TimeoutMs
}
end.


%%% Tests

Expand Down Expand Up @@ -304,4 +359,67 @@ commit_request_test() ->
#{}
),
?event({res, Res}),
?assertEqual(<<"value">>, Res).
?assertEqual(<<"value">>, Res).

relay_failover_test() ->
application:ensure_all_started([hb]),
PeerWallet = ar_wallet:new(),
RelayWallet = ar_wallet:new(),
Peer = hb_http_server:start_node(#{ priv_wallet => PeerWallet }),
Node =
hb_http_server:start_node(NodeOpts = #{
relay_allow_commit_request => true,
priv_wallet => RelayWallet,
routes =>
[
#{
<<"template">> => <<"/~meta@1.0/info.*">>,
<<"nodes">> => [
#{
% Remote peer used to exercise timeout-driven
% failover. When Google one day runs HB, we can
% lower this again.
<<"prefix">> => <<"http://google.com/">>,
<<"http-timeout">> => 10000
},
#{
<<"prefix">> => <<"http://doesnotroute.invalid/">>,
<<"http-timeout">> => 2000
},
#{
% Local peer that should eventually succeed.
<<"prefix">> => Peer,
<<"http-timeout">> => 5000
}
]
}
],
on => #{
<<"request">> =>
#{
<<"device">> => <<"router@1.0">>,
<<"path">> => <<"preprocess">>,
<<"commit-request">> => true
}
}
}),
% Validate that the server can forward requests through the `hb_http:get` API.
{ok, DirectRecvdAddr} =
hb_http:request(
#{ <<"path">> => <<"~meta@1.0/info/address">> },
NodeOpts
),
?assertEqual(hb_util:human_id(PeerWallet), DirectRecvdAddr),
% Validate that the relay device is able to forward requests to the peer.
{ok, RelayRecvdAddr} =
hb_http:get(
Node,
<<"~relay@1.0/call?relay-path=~meta@1.0/info/address">>,
#{}
),
?assertEqual(hb_util:human_id(PeerWallet), RelayRecvdAddr),
?hr(),
timer:sleep(100),
% Validate that the server forwards requests from clients to the peer.
{ok, ClientRecvdAddr} = hb_http:get(Node, <<"~meta@1.0/info/address">>, #{}),
?assertEqual(hb_util:human_id(PeerWallet), ClientRecvdAddr).
45 changes: 43 additions & 2 deletions src/dev_router.erl
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,48 @@ preprocess(Base, RawReq, Opts) ->
}]
}}
end;
{ok, _Method, Node, _Path, _MsgWithoutMeta, _ReqOpts} ->
{ok, _Method, RawPeers, _Path, _MsgWithoutMeta, _ReqOpts} ->
?event(debug_preprocess, {raw_peers, RawPeers}),
Peer =
if is_map(RawPeers) ->
Nodes =
hb_maps:get(
<<"nodes">>,
RawPeers,
[hb_maps:get(<<"node">>, RawPeers, <<>>, Opts)],
Opts
),
NewNodes =
lists:map(
fun(P) ->
URI =
uri_string:parse(
hb_maps:get(
<<"uri">>,
P,
<<>>,
Opts
)
),
P#{
<<"uri">> =>
hb_util:bin(
uri_string:recompose(
URI#{
path => <<"user-path">>
}
)
)
}

end,
Nodes
),
RawPeers#{
<<"nodes">> => NewNodes
};
true -> RawPeers
end,
?event(debug_preprocess, {matched_route, {explicit, Res}}),
CommitRequest =
hb_util:atom(
Expand Down Expand Up @@ -603,7 +644,7 @@ preprocess(Base, RawReq, Opts) ->
<<"device">> => <<"relay@1.0">>,
<<"relay-device">> => <<"apply@1.0">>,
<<"method">> => <<"POST">>,
<<"peer">> => Node
<<"peer">> => Peer
},
#{
<<"path">> => <<"call">>,
Expand Down
29 changes: 26 additions & 3 deletions src/hb_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ request(Method, Config = #{ <<"nodes">> := Nodes }, Path, Message, Opts) when is
% `multirequest' functionality, rather than a single request.
hb_http_multi:request(Config, Method, Path, Message, Opts);
request(Method, #{ <<"opts">> := ReqOpts, <<"uri">> := URI }, _Path, Message, Opts) ->
ExplicitMethod =
hb_maps:get(
<<"method">>,
Message,
not_found,
Opts
),
% The request has a set of additional options, so we apply them to the
% request.
MergedOpts =
Expand All @@ -85,7 +92,12 @@ request(Method, #{ <<"opts">> := ReqOpts, <<"uri">> := URI }, _Path, Message, Op
Message#{ <<"path">> => URI, <<"method">> => Method },
MergedOpts
),
request(NewMethod, Node, NewPath, NewMsg, NewOpts);
FinalMethod =
case ExplicitMethod of
not_found -> NewMethod;
_ -> Method
end,
request(FinalMethod, Node, NewPath, NewMsg, NewOpts);
request(Method, Peer, Path, RawMessage, Opts) ->
?event({request, {method, Method}, {peer, Peer}, {path, Path}, {message, RawMessage}}),
Req =
Expand All @@ -104,7 +116,15 @@ request(Method, Peer, Path, RawMessage, Opts) ->
),
StartTime = os:system_time(millisecond),
% Perform the HTTP request.
{_ErlStatus, Status, Headers, Body} = hb_http_client:request(Req, Opts),
Res = hb_http_client:request(Req, Opts),
process_response(Method, Peer, Path, Req, StartTime, Res, Opts).

%% @doc Process a raw response from the HTTP client.
process_response(
Method, Peer, Path, Req, StartTime,
{_ErlStatus, Status, Headers, Body},
Opts
) ->
% Process the response.
EndTime = os:system_time(millisecond),
?event(http_outbound,
Expand Down Expand Up @@ -213,7 +233,10 @@ request(Method, Peer, Path, RawMessage, Opts) ->
Body,
Opts
)
end.
end;
process_response(_, _, _, _, _, {error, Reason}, _Opts) ->
?event(http, {http_request_failed, {reason, Reason}}),
{error, {http_request_failed, Reason}}.

%% @doc Convert a HTTP status code to a status atom.
response_status_to_atom(Status) ->
Expand Down