Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Framework/Core/include/Framework/CommonServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ struct CommonServices {
return [](InitContext&, void* service) -> void* { return service; };
}

static ServiceSpec deviceContextSpec();
static ServiceSpec dataProcessorContextSpec();
static ServiceSpec driverClientSpec();
static ServiceSpec monitoringSpec();
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers {
class DataProcessingDevice : public fair::mq::Device
{
public:
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies);
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&);
void Init() final;
void InitTask() final;
void PreRun() final;
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DeviceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t;
namespace o2::framework
{
struct ComputingQuotaStats;
struct ProcessingPolicies;

/// Stucture which holds the whole runtime context
/// of a running device which is not stored as
Expand All @@ -33,6 +34,7 @@ struct DeviceContext {
int expectedRegionCallbacks = 0;
int exitTransitionTimeout = 0;
int dataProcessingTimeout = 0;
ProcessingPolicies& processingPolicies;
};

} // namespace o2::framework
Expand Down
11 changes: 0 additions & 11 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1237,17 +1237,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec()
.kind = ServiceKind::Serial};
}

o2::framework::ServiceSpec CommonServices::deviceContextSpec()
{
return ServiceSpec{
.name = "device-context",
.init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
},
.configure = noConfiguration(),
.kind = ServiceKind::Serial};
}

o2::framework::ServiceSpec CommonServices::dataAllocatorSpec()
{
return ServiceSpec{
Expand Down
107 changes: 58 additions & 49 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Framework/DataProcessor.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DeviceState.h"
#include "Framework/DeviceStateEnums.h"
#include "Framework/DispatchPolicy.h"
#include "Framework/DispatchControl.h"
#include "Framework/DanglingContext.h"
Expand Down Expand Up @@ -196,11 +197,10 @@ struct locked_execution {
~locked_execution() { ref.unlock(); }
};

DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies)
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry)
: mRunningDevice{running},
mConfigRegistry{nullptr},
mServiceRegistry{registry},
mProcessingPolicies{policies}
mServiceRegistry{registry}
{
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
if (key == "cleanup") {
Expand Down Expand Up @@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
mHandles.resize(1);

ServiceRegistryRef ref{mServiceRegistry};

mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
auto& state = ref.get<DeviceState>();
assert(state.loop);
Expand Down Expand Up @@ -1330,6 +1331,58 @@ void DataProcessingDevice::Reset()
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
}

TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
{
auto& state = ref.get<DeviceState>();
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
return state.transitionHandling;
}
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
auto& deviceContext = ref.get<DeviceContext>();
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
switchState(ref, StreamingState::EndOfStreaming);
}

// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
}
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
bool onlyGenerated = hasOnlyGenerated(spec);
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
"New state requested. Waiting for %d seconds before %{public}s",
timeout,
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
}
return TransitionHandlingState::Requested;
} else {
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
} else if (policies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
return TransitionHandlingState::Expired;
}
}

void DataProcessingDevice::Run()
{
ServiceRegistryRef ref{mServiceRegistry};
Expand Down Expand Up @@ -1382,51 +1435,7 @@ void DataProcessingDevice::Run()
shouldNotWait = true;
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
}
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
state.transitionHandling = TransitionHandlingState::Requested;
auto& deviceContext = ref.get<DeviceContext>();
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
switchState(ref, StreamingState::EndOfStreaming);
}

// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
}
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
bool onlyGenerated = hasOnlyGenerated(spec);
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
"New state requested. Waiting for %d seconds before %{public}s",
timeout,
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
}
} else {
state.transitionHandling = TransitionHandlingState::Expired;
if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
} else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
}
}
state.transitionHandling = updateStateTransition(ref, mProcessingPolicies);
// If we are Idle, we can then consider the transition to be expired.
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
Expand Down Expand Up @@ -1560,7 +1569,7 @@ void DataProcessingDevice::Run()
}
}

O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
auto& spec = ref.get<DeviceSpec const>();
/// Cleanup messages which are still pending on exit.
for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1092,13 +1092,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));

deviceContext = std::make_unique<DeviceContext>();
deviceContext = std::make_unique<DeviceContext>(DeviceContext{.processingPolicies = processingPolicies});
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));

auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry, processingPolicies);
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);

serviceRef.get<RawDeviceService>().setDevice(device.get());
r.fDevice = std::move(device);
Expand Down
8 changes: 4 additions & 4 deletions Framework/Core/test/test_AllCrashTypes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=qui
printf "ok\nTesting framework-init..."
o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | grep -q "Exception caught while in Init: This is a o2::framework::runtime_error. Exiting with 1." || { printf "framework error not found" ; exit 1; }
printf "ok\nTesting framework-run..."
o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; }
o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; }
printf "ok\nTesting runtime-run..."
o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; }
o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; }
printf "ok\n"

export O2_NO_CATCHALL_EXCEPTIONS=1
Expand All @@ -17,7 +17,7 @@ o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=qui
printf "ok\nTesting framework-init..."
o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | grep -v -q "Exception caught: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; }
printf "ok\nTesting framework-run..."
o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; }
o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; }
printf "ok\nTesting runtime-run..."
o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; }
o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; }
printf "ok"