Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions run/O2PrimaryServerDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,30 +251,42 @@ class O2PrimaryServerDevice final : public fair::mq::Device
void launchInfoThread()
{
static std::vector<std::thread> threads;

auto sendErrorReply = [](fair::mq::Channel& channel) {
LOG(error) << "UNKNOWN REQUEST";
std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)(404)));
channel.Send(reply);
};

LOG(info) << "LAUNCHING STATUS THREAD";
auto lambda = [this]() {
auto lambda = [this, sendErrorReply]() {
while (mState != O2PrimaryServerState::Stopped) {
auto& channel = GetChannels().at("o2sim-primserv-info").at(0);
if (!channel.IsValid()) {
LOG(error) << "channel primserv-info not valid";
}
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(-1));
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)(-1)));
int timeout = 100; // 100ms --> so as not to block and allow for proper termination of this thread
if (channel.Receive(request, timeout) > 0) {
LOG(info) << "INFO REQUEST RECEIVED";
if (*(int*)(request->GetData()) == (int)O2PrimaryServerInfoRequest::Status) {
int request_payload; // we expect an (int) ~ to type O2PrimaryServerInfoRequest
if (request->GetSize() != sizeof(request_payload)) {
LOG(error) << "Obtained request with unexpected payload size";
sendErrorReply(channel); // ALWAYS reply
}

memcpy(&request_payload, request->GetData(), sizeof(request_payload));

if (request_payload == (int)O2PrimaryServerInfoRequest::Status) {
LOG(info) << "Received status request";
// request needs to be a simple enum of type O2PrimaryServerInfoRequest
std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)mState.load()));
if (channel.Send(reply) > 0) {
LOG(info) << "Send status successful";
}
} else if (*(int*)request->GetData() == (int)O2PrimaryServerInfoRequest::Config) {
} else if (request_payload == (int)O2PrimaryServerInfoRequest::Config) {
HandleConfigRequest(channel);
} else {
LOG(fatal) << "UNKNOWN REQUEST";
std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage(404));
channel.Send(reply);
sendErrorReply(channel);
}
}
}
Expand Down Expand Up @@ -450,6 +462,8 @@ class O2PrimaryServerDevice final : public fair::mq::Device
if (channel.Send(message) > 0) {
LOG(info) << "config reply send ";
return true;
} else {
LOG(error) << "Failure sending config reply ";
}
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions run/O2SimDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class O2SimDevice final : public fair::mq::Device
// auto text = new std::string("configrequest");
// std::unique_ptr<fair::mq::Message> request(channel.NewMessage(const_cast<char*>(text->c_str()),
// text->length(), CustomCleanup, text));
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(O2PrimaryServerInfoRequest::Config));
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Config));
std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());

int timeoutinMS = 60000; // wait for 60s max --> should be fast reply
Expand Down Expand Up @@ -164,7 +164,7 @@ class O2SimDevice final : public fair::mq::Device
while (reprobe) {
reprobe = false;
int i = -1;
fair::mq::MessagePtr request(statuschannel.NewSimpleMessage(O2PrimaryServerInfoRequest::Status));
fair::mq::MessagePtr request(statuschannel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Status));
fair::mq::MessagePtr reply(statuschannel.NewSimpleMessage(i));
auto sendcode = statuschannel.Send(request, timeoutinMS);
if (sendcode > 0) {
Expand Down