From 2c012d9006781562c08fa36ae9a8d20ff3c35fc0 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Mon, 26 May 2025 14:00:53 +0200 Subject: [PATCH] QC-1287 Fix Mergers latency optimization with RoundRobin processing The optimization was correctly working when we split a range of InputSpecs between different Mergers in a layer, but not when they are time-pipelined. In the commit, we modify the expected number of input messages per cycle to include time-pipeline parameter in the previous and current layer. --- Utilities/Mergers/src/MergerInfrastructureBuilder.cxx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx index 9fcb6aaa482dd..8719dfb9fe0ca 100644 --- a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx +++ b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx @@ -136,6 +136,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() mergerBuilder.setName(mInfrastructureName); mergerBuilder.setOutputSpecMovingWindow(mOutputSpecMovingWindow); + size_t timePipelinePreviousLayer = 1; for (size_t layer = 1; layer < mergersPerLayer.size(); layer++) { size_t numberOfMergers = mergersPerLayer[layer]; @@ -166,7 +167,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() if (layer > 1 && !expendable) { // we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs. // we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes. - const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd); + + // The formula below takes into account both ways of splitting inputs - by consuming a subset of InputSpecs and by using time-pipelined data processors. + const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd) * timePipelinePreviousLayer / timePipelineVal; assert(inputNumber != 0); layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber}; } @@ -185,6 +188,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() inputsRangeBegin = inputsRangeEnd; } layerInputs = nextLayerInputs; // todo: could be optimised with pointers + timePipelinePreviousLayer = timePipelineVal; } return workflow;