From eb14b21f28fc59333297d522d5478928e183dbde Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 17 Feb 2025 09:23:58 +0100 Subject: [PATCH 1/2] DPL: cleanup state switching - Remove duplicate helper - Add signposts to mark streaming states transitions - Notify driver --- Framework/Core/src/DataProcessingDevice.cxx | 44 ++++++++++----------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 8a3fbbcf5b2f1..e8676995772e6 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -141,6 +141,17 @@ void on_transition_requested_expired(uv_timer_t* handle) state.transitionHandling = TransitionHandlingState::Expired; } +auto switchState(ServiceRegistryRef& ref, StreamingState newState) -> void +{ + auto& state = ref.get(); + auto& context = ref.get(); + O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); + O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming); + O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState); + state.streaming = newState; + ref.get().notifyStreamingState(state.streaming); +}; + void on_data_processing_expired(uv_timer_t* handle) { auto* ref = (ServiceRegistryRef*)handle->data; @@ -1236,7 +1247,7 @@ void DataProcessingDevice::PreRun() O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop); O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback."); state.quitRequested = false; - state.streaming = StreamingState::Streaming; + switchState(ref, StreamingState::Streaming); state.allowedProcessing = DeviceState::Any; for (auto& info : state.inputChannelInfos) { if (info.state != InputChannelState::Pull) { @@ -1365,10 +1376,10 @@ void DataProcessingDevice::Run() // Check if we only have timers auto& spec = ref.get(); if (hasOnlyTimers(spec)) { - state.streaming = StreamingState::EndOfStreaming; + switchState(ref, StreamingState::EndOfStreaming); } - // If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively + // If this is a source device, exitTransitionTimeout and dataProcessingTimeout are effectively // the same (because source devices are not allowed to produce any calibration). // should be the same. if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) { @@ -1385,7 +1396,8 @@ void DataProcessingDevice::Run() state.transitionHandling = TransitionHandlingState::Requested; ref.get().call(ServiceRegistryRef{ref}); uv_update_time(state.loop); - O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", + deviceContext.exitTransitionTimeout); uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout); @@ -1728,15 +1740,6 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) { auto& context = ref.get(); O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); - auto switchState = [ref](StreamingState newState) { - auto& state = ref.get(); - auto& context = ref.get(); - O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); - O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming); - O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState); - state.streaming = newState; - ref.get().notifyStreamingState(state.streaming); - }; auto& state = ref.get(); auto& spec = ref.get(); @@ -1772,7 +1775,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // dependent on the callback, not something which is controlled by the // framework itself. if (context.allDone == true && state.streaming == StreamingState::Streaming) { - switchState(StreamingState::EndOfStreaming); + switchState(ref, StreamingState::EndOfStreaming); state.lastActiveDataProcessor = &context; } @@ -1818,7 +1821,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) } // This is needed because the transport is deleted before the device. relayer.clear(); - switchState(StreamingState::Idle); + switchState(ref, StreamingState::Idle); // In case we should process, note the data processor responsible for it if (shouldProcess) { state.lastActiveDataProcessor = &context; @@ -2328,13 +2331,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v } }; - auto switchState = [ref](StreamingState newState) { - auto& control = ref.get(); - auto& state = ref.get(); - state.streaming = newState; - control.notifyStreamingState(state.streaming); - }; - ref.get().getReadyToProcess(completed); if (completed.empty() == true) { LOGP(debug, "No computations available for dispatching."); @@ -2510,7 +2506,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding."); } else { O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle"); - state.streaming = StreamingState::Idle; + switchState(ref, StreamingState::Idle); } if (shouldProcess(action)) { auto& timingInfo = ref.get(); @@ -2598,7 +2594,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v for (auto& channel : spec.outputChannels) { DataProcessingHelpers::sendEndOfStream(ref, channel); } - switchState(StreamingState::Idle); + switchState(ref, StreamingState::Idle); } return true; From db2de1a9c0ce623d5959061cce2e8ed93b7f248e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 17 Feb 2025 09:28:46 +0100 Subject: [PATCH 2/2] DPL: correctly handle data-processing-timeouts in sources --- Framework/Core/src/DataProcessingDevice.cxx | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index e8676995772e6..7f42805cfdb1e 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -156,16 +156,19 @@ void on_data_processing_expired(uv_timer_t* handle) { auto* ref = (ServiceRegistryRef*)handle->data; auto& state = ref->get(); + auto& spec = ref->get(); state.loopReason |= DeviceState::TIMER_EXPIRED; // Check if this is a source device O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); - // Source devices should never end up in this callback, since the exitTransitionTimeout should - // be reset to the dataProcessingTimeout and the timers cohalesced. - assert(hasOnlyGenerated(ref->get()) == false); - O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards."); - state.allowedProcessing = DeviceState::CalibrationOnly; + if (hasOnlyGenerated(spec)) { + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming."); + switchState(*ref, StreamingState::EndOfStreaming); + } else { + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards."); + state.allowedProcessing = DeviceState::CalibrationOnly; + } } void on_communication_requested(uv_async_t* s) @@ -1379,13 +1382,6 @@ void DataProcessingDevice::Run() switchState(ref, StreamingState::EndOfStreaming); } - // If this is a source device, exitTransitionTimeout and dataProcessingTimeout are effectively - // the same (because source devices are not allowed to produce any calibration). - // should be the same. - if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) { - deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout; - } - // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) { uv_update_time(state.loop);