From b54384bf3ab7b1633bc6522f588254cf350865cd Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 20 Oct 2025 15:42:41 +0200 Subject: [PATCH 1/3] DPL: improve crash test: The executable might be in some subfolder, like stage/bin/. --- Framework/Core/test/test_AllCrashTypes.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Framework/Core/test/test_AllCrashTypes.sh b/Framework/Core/test/test_AllCrashTypes.sh index 54898fd9c4c5d..12d957f835b09 100755 --- a/Framework/Core/test/test_AllCrashTypes.sh +++ b/Framework/Core/test/test_AllCrashTypes.sh @@ -5,9 +5,9 @@ o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=qui printf "ok\nTesting framework-init..." o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | grep -q "Exception caught while in Init: This is a o2::framework::runtime_error. Exiting with 1." || { printf "framework error not found" ; exit 1; } printf "ok\nTesting framework-run..." -o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } printf "ok\nTesting runtime-run..." -o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; } printf "ok\n" export O2_NO_CATCHALL_EXCEPTIONS=1 @@ -17,7 +17,7 @@ o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=qui printf "ok\nTesting framework-init..." o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | grep -v -q "Exception caught: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } printf "ok\nTesting framework-run..." -o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } printf "ok\nTesting runtime-run..." -o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; } printf "ok" From 5dafde5ead186ce3cd830d1cdcf23f566f0b390e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 17 Oct 2025 10:03:37 +0200 Subject: [PATCH 2/3] DPL: refactor TransitionState calculation * Move it to a function and make it easily invokable with just a service registry ref. * Drop unneeded helper. * Move the policies to the DeviceContext this should not change the current behavior. The goal is to be able to trigger the timers also within the input proxy busy loop so that the timers start at the appropriate moment. --- .../Core/include/Framework/CommonServices.h | 1 - .../include/Framework/DataProcessingDevice.h | 2 +- .../Core/include/Framework/DeviceContext.h | 2 + Framework/Core/src/CommonServices.cxx | 11 -- Framework/Core/src/DataProcessingDevice.cxx | 107 ++++++++++-------- Framework/Core/src/runDataProcessing.cxx | 4 +- 6 files changed, 63 insertions(+), 64 deletions(-) diff --git a/Framework/Core/include/Framework/CommonServices.h b/Framework/Core/include/Framework/CommonServices.h index 69f3152c0ba76..f5080fcac28db 100644 --- a/Framework/Core/include/Framework/CommonServices.h +++ b/Framework/Core/include/Framework/CommonServices.h @@ -56,7 +56,6 @@ struct CommonServices { return [](InitContext&, void* service) -> void* { return service; }; } - static ServiceSpec deviceContextSpec(); static ServiceSpec dataProcessorContextSpec(); static ServiceSpec driverClientSpec(); static ServiceSpec monitoringSpec(); diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index 67edaa99e532b..119cfe02411fc 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers { class DataProcessingDevice : public fair::mq::Device { public: - DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies); + DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&); void Init() final; void InitTask() final; void PreRun() final; diff --git a/Framework/Core/include/Framework/DeviceContext.h b/Framework/Core/include/Framework/DeviceContext.h index 4593e5e819ccf..dbe3dbebbd7f1 100644 --- a/Framework/Core/include/Framework/DeviceContext.h +++ b/Framework/Core/include/Framework/DeviceContext.h @@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t; namespace o2::framework { struct ComputingQuotaStats; +struct ProcessingPolicies; /// Stucture which holds the whole runtime context /// of a running device which is not stored as @@ -33,6 +34,7 @@ struct DeviceContext { int expectedRegionCallbacks = 0; int exitTransitionTimeout = 0; int dataProcessingTimeout = 0; + ProcessingPolicies &processingPolicies; }; } // namespace o2::framework diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 5a2876e074d29..091cd9d4ed0a5 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -1237,17 +1237,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec() .kind = ServiceKind::Serial}; } -o2::framework::ServiceSpec CommonServices::deviceContextSpec() -{ - return ServiceSpec{ - .name = "device-context", - .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle { - return ServiceHandle{TypeIdHelpers::uniqueId(), new DeviceContext()}; - }, - .configure = noConfiguration(), - .kind = ServiceKind::Serial}; -} - o2::framework::ServiceSpec CommonServices::dataAllocatorSpec() { return ServiceSpec{ diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index a41aa3a886d55..b94355946bcd4 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -17,6 +17,7 @@ #include "Framework/DataProcessor.h" #include "Framework/DataSpecUtils.h" #include "Framework/DeviceState.h" +#include "Framework/DeviceStateEnums.h" #include "Framework/DispatchPolicy.h" #include "Framework/DispatchControl.h" #include "Framework/DanglingContext.h" @@ -196,11 +197,10 @@ struct locked_execution { ~locked_execution() { ref.unlock(); } }; -DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies) +DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry) : mRunningDevice{running}, mConfigRegistry{nullptr}, - mServiceRegistry{registry}, - mProcessingPolicies{policies} + mServiceRegistry{registry} { GetConfig()->Subscribe("dpl", [®istry = mServiceRegistry](const std::string& key, std::string value) { if (key == "cleanup") { @@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi mHandles.resize(1); ServiceRegistryRef ref{mServiceRegistry}; + mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t)); auto& state = ref.get(); assert(state.loop); @@ -1330,6 +1331,58 @@ void DataProcessingDevice::Reset() ref.get().call(); } +TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies) +{ + auto& state = ref.get(); + auto& deviceProxy = ref.get(); + if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) { + return state.transitionHandling; + } + O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop); + auto& deviceContext = ref.get(); + // Check if we only have timers + auto& spec = ref.get(); + if (hasOnlyTimers(spec)) { + switchState(ref, StreamingState::EndOfStreaming); + } + + // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout + if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) { + uv_update_time(state.loop); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout); + uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0); + } + if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { + ref.get().call(ServiceRegistryRef{ref}); + uv_update_time(state.loop); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", + deviceContext.exitTransitionTimeout); + uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); + bool onlyGenerated = hasOnlyGenerated(spec); + int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout; + if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout); + } else { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", + "New state requested. Waiting for %d seconds before %{public}s", + timeout, + onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state."); + } + return TransitionHandlingState::Requested; + } else { + if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); + } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); + } else if (policies.termination == TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); + } else { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately."); + } + return TransitionHandlingState::Expired; + } +} + void DataProcessingDevice::Run() { ServiceRegistryRef ref{mServiceRegistry}; @@ -1382,51 +1435,7 @@ void DataProcessingDevice::Run() shouldNotWait = true; state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING; } - if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) { - state.transitionHandling = TransitionHandlingState::Requested; - auto& deviceContext = ref.get(); - // Check if we only have timers - auto& spec = ref.get(); - if (hasOnlyTimers(spec)) { - switchState(ref, StreamingState::EndOfStreaming); - } - - // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout - if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) { - uv_update_time(state.loop); - O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout); - uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0); - } - if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { - state.transitionHandling = TransitionHandlingState::Requested; - ref.get().call(ServiceRegistryRef{ref}); - uv_update_time(state.loop); - O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", - deviceContext.exitTransitionTimeout); - uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); - bool onlyGenerated = hasOnlyGenerated(spec); - int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout; - if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout); - } else { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", - "New state requested. Waiting for %d seconds before %{public}s", - timeout, - onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state."); - } - } else { - state.transitionHandling = TransitionHandlingState::Expired; - if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); - } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); - } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); - } else { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately."); - } - } - } + state.transitionHandling = updateStateTransition(ref, mProcessingPolicies); // If we are Idle, we can then consider the transition to be expired. if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) { O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed."); @@ -1560,7 +1569,7 @@ void DataProcessingDevice::Run() } } - O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling); + O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling); auto& spec = ref.get(); /// Cleanup messages which are still pending on exit. for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 1611eb8605134..872f460d4b326 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1092,13 +1092,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, quotaEvaluator = std::make_unique(serviceRef); serviceRef.registerService(ServiceRegistryHelpers::handleForService(quotaEvaluator.get())); - deviceContext = std::make_unique(); + deviceContext = std::make_unique(DeviceContext{.processingPolicies = processingPolicies}); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&spec)); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&runningWorkflow)); serviceRef.registerService(ServiceRegistryHelpers::handleForService(deviceContext.get())); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&driverConfig)); - auto device = std::make_unique(ref, serviceRegistry, processingPolicies); + auto device = std::make_unique(ref, serviceRegistry); serviceRef.get().setDevice(device.get()); r.fDevice = std::move(device); From 73f65da8458f19b9c143dd8c56b344d2026a666b Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Mon, 20 Oct 2025 13:45:05 +0000 Subject: [PATCH 3/3] Please consider the following formatting changes --- Framework/Core/include/Framework/DeviceContext.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/DeviceContext.h b/Framework/Core/include/Framework/DeviceContext.h index dbe3dbebbd7f1..a392004c2ffbf 100644 --- a/Framework/Core/include/Framework/DeviceContext.h +++ b/Framework/Core/include/Framework/DeviceContext.h @@ -34,7 +34,7 @@ struct DeviceContext { int expectedRegionCallbacks = 0; int exitTransitionTimeout = 0; int dataProcessingTimeout = 0; - ProcessingPolicies &processingPolicies; + ProcessingPolicies& processingPolicies; }; } // namespace o2::framework