diff --git a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx index 9fcb6aaa482dd..8719dfb9fe0ca 100644 --- a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx +++ b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx @@ -136,6 +136,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() mergerBuilder.setName(mInfrastructureName); mergerBuilder.setOutputSpecMovingWindow(mOutputSpecMovingWindow); + size_t timePipelinePreviousLayer = 1; for (size_t layer = 1; layer < mergersPerLayer.size(); layer++) { size_t numberOfMergers = mergersPerLayer[layer]; @@ -166,7 +167,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() if (layer > 1 && !expendable) { // we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs. // we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes. - const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd); + + // The formula below takes into account both ways of splitting inputs - by consuming a subset of InputSpecs and by using time-pipelined data processors. + const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd) * timePipelinePreviousLayer / timePipelineVal; assert(inputNumber != 0); layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber}; } @@ -185,6 +188,7 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() inputsRangeBegin = inputsRangeEnd; } layerInputs = nextLayerInputs; // todo: could be optimised with pointers + timePipelinePreviousLayer = timePipelineVal; } return workflow;