Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions run/O2HitMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,29 @@ namespace o2
namespace devices
{

// Function communicating to primary particle server that it is now safe to shutdown.
// From the perspective of o2-sim, this is the case when all configs have been propagated and the system
// is running ok: For instance after the HitMerger is initialized and got it's first data from Geant workers.
bool primaryServer_sendShutdownPermission(fair::mq::Channel& channel)
{
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)o2::O2PrimaryServerInfoRequest::AllowShutdown));
std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());

int timeoutinMS = 100;
if (channel.Send(request, timeoutinMS) > 0) {
LOG(info) << "Sending Shutdown permission to particle server";
if (channel.Receive(reply, timeoutinMS) > 0) {
// the answer is a simple ack with a status code
LOG(info) << "Shutdown permission was acknowledged";
} else {
LOG(error) << "No answer received within " << timeoutinMS << "ms\n";
return false;
}
return true;
}
return false;
}

class O2HitMerger : public fair::mq::Device
{

Expand Down Expand Up @@ -129,6 +152,9 @@ class O2HitMerger : public fair::mq::Device
if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
outfilename = o2::base::NameConf::getMCKinematicsFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str());
mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
} else {
// we didn't manage to get a configuration --> better to fail
LOG(fatal) << "No configuration received. Aborting";
}
mAsService = o2::conf::SimConfig::Instance().asService();
mForwardKine = o2::conf::SimConfig::Instance().forwardKine();
Expand Down Expand Up @@ -354,6 +380,13 @@ class O2HitMerger : public fair::mq::Device
// for the next batch
return waitForControlInput();
}

static bool initAcknowledged = false;
if (!initAcknowledged) {
primaryServer_sendShutdownPermission(GetChannels().at("o2sim-primserv-info").at(0));
initAcknowledged = true;
}

return more;
}

Expand Down Expand Up @@ -413,10 +446,6 @@ class O2HitMerger : public fair::mq::Device
};
}
}
if (!expectmore) {
// somehow FairMQ has difficulties shutting down; helping manually
// raise(SIGINT);
}
return expectmore;
}

Expand Down
19 changes: 14 additions & 5 deletions run/O2PrimaryServerDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,10 @@ class O2PrimaryServerDevice final : public fair::mq::Device
}
}

// launches a thread that listens for status requests from outside asynchronously
// launches a thread that listens for status/config/shutdown requests from outside asynchronously
void launchInfoThread()
{
static std::vector<std::thread> threads;

auto sendErrorReply = [](fair::mq::Channel& channel) {
LOG(error) << "UNKNOWN REQUEST";
std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)(404)));
Expand All @@ -260,7 +259,9 @@ class O2PrimaryServerDevice final : public fair::mq::Device

LOG(info) << "LAUNCHING STATUS THREAD";
auto lambda = [this, sendErrorReply]() {
while (mState != O2PrimaryServerState::Stopped) {
bool canShutdown{false};
// Exit only when both: serving stopped and allowed from outside.
while (!(mState == O2PrimaryServerState::Stopped && canShutdown)) {
auto& channel = GetChannels().at("o2sim-primserv-info").at(0);
if (!channel.IsValid()) {
LOG(error) << "channel primserv-info not valid";
Expand All @@ -285,6 +286,11 @@ class O2PrimaryServerDevice final : public fair::mq::Device
}
} else if (request_payload == (int)O2PrimaryServerInfoRequest::Config) {
HandleConfigRequest(channel);
} else if (request_payload == (int)O2PrimaryServerInfoRequest::AllowShutdown) {
LOG(info) << "Got info that we may shutdown";
std::unique_ptr<fair::mq::Message> ack(channel.NewSimpleMessage(200));
channel.Send(ack);
canShutdown = true;
} else {
sendErrorReply(channel);
}
Expand Down Expand Up @@ -518,10 +524,13 @@ class O2PrimaryServerDevice final : public fair::mq::Device

void PostRun() override
{
// We shouldn't shut down immediately when all events have been served
// Instead we also need to wait until the info thread running some communication server
// with other processes is finished.
while (!mInfoThreadStopped) {
LOG(info) << "Waiting info thread";
using namespace std::chrono_literals;
std::this_thread::sleep_for(100ms);
std::this_thread::sleep_for(1000ms);
}
}

Expand All @@ -534,7 +543,7 @@ class O2PrimaryServerDevice final : public fair::mq::Device
if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
workavailable = false;
}
if (!(mState == O2PrimaryServerState::ReadyToServe || mState == O2PrimaryServerState::WaitingEvent)) {
if (!(mState.load() == O2PrimaryServerState::ReadyToServe || mState.load() == O2PrimaryServerState::WaitingEvent)) {
// send a zero answer
workavailable = false;
}
Expand Down
3 changes: 0 additions & 3 deletions run/O2SimDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ class O2SimDevice final : public fair::mq::Device
// returns true if successful / false if not
static bool querySimConfig(fair::mq::Channel& channel)
{
// auto text = new std::string("configrequest");
// std::unique_ptr<fair::mq::Message> request(channel.NewMessage(const_cast<char*>(text->c_str()),
// text->length(), CustomCleanup, text));
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Config));
std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());

Expand Down
7 changes: 4 additions & 3 deletions run/PrimaryServerState.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ enum class O2PrimaryServerState {
};
static const char* PrimStateToString[5] = {"INIT", "SERVING", "WAITEVENT", "IDLE", "STOPPED"};

/// enum class for type of info request
/// enum class for request to o2sim-primserv-info channel of the O2PrimaryServerDevice
enum class O2PrimaryServerInfoRequest {
Status = 1,
Config = 2
Status = 1, // asks to retrieve current status of O2PrimaryServerDevice --> will send O2PrimaryServerState
Config = 2, // asks for o2-sim config reply
AllowShutdown = 3 // can be used to let particle server know that shutdown is now safe (once all components initialized)
};

/// Struct to be used as payload when making a request
Expand Down