diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx index f4cd64377034e..2b8090af42648 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx @@ -87,7 +87,8 @@ class TFReaderSpec : public o2f::Task std::map>> mRunTimeRanges; o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector int mConvRunTimeRangesToOrbits = -1; // not defined yet - int mTFCounter = 0; + int mSentTFCounter = 0; + int mAccTFCounter = 0; int mTFBuilderCounter = 0; int mNWaits = 0; int mTFLength = 32; @@ -159,15 +160,15 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) } if (hd->splitPayloadIndex == 0) { // check the 1st one only auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}]; - if (entry.count != this->mTFCounter) { + if (entry.count != this->mSentTFCounter) { if (verbose && hdPrev) { // report previous partition size LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as(), hdPrev->dataDescription.as(), dsize); } dsizeTot += dsize; dsize = 0; - entry.count = this->mTFCounter; // acknowledge identifier seen in the data + entry.count = this->mSentTFCounter; // acknowledge identifier seen in the data LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as() << "/" << hd->dataDescription.as() - << "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mTFCounter; + << "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mSentTFCounter; nblocks++; } } @@ -219,11 +220,11 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) const auto* hd0 = o2h::get(dataptr); const auto* dph = o2h::get(dataptr); for (auto& out : this->mSeenOutputMap) { - if (out.second.count == this->mTFCounter) { // was seen in the data + if (out.second.count == this->mSentTFCounter) { // was seen in the data continue; } LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as() << "/" << out.first.dataDescription.as() - << "/" << out.second.defSubSpec << " for TF " << this->mTFCounter; + << "/" << out.second.defSubSpec << " for TF " << this->mSentTFCounter; o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0); outHeader.payloadSerializationMethod = o2h::gSerializationMethodNone; outHeader.firstTForbit = hd0->firstTForbit; @@ -270,7 +271,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) auto tNow = std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count(); auto tDiff = tNow - tLastTF; - if (mTFCounter && tDiff < mInput.delay_us) { + if (mSentTFCounter && tDiff < mInput.delay_us) { std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending } for (auto& msgIt : *tfPtr.get()) { @@ -285,9 +286,9 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) // however this is a small enough hack for now. ctx.services().get().fakeDispatch(); tNow = std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count(); - LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter, dataSize, nparts, mTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast); + LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter, dataSize, nparts, mSentTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast); tLastTF = tNow; - ++mTFCounter; + ++mSentTFCounter; while (mTFQueue.size() == 0 && mWaitSendingLast) { usleep(10000); @@ -300,7 +301,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) } // usleep(5000); // wait 5ms for new TF to be built } - if (mTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done + if (mSentTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done stopProcessing(ctx); } } @@ -325,7 +326,7 @@ void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx) return; } stopDone = true; - LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits); + LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits); mRunning = false; if (mFileFetcher) { mFileFetcher->stop(); @@ -420,7 +421,7 @@ void TFReaderSpec::TFBuilder() std::this_thread::sleep_for(sleepTime); continue; } - auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity); + auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity); bool acceptTF = true; if (tf) { if (mRunTimeRanges.size()) { @@ -443,21 +444,23 @@ void TFReaderSpec::TFBuilder() locID++; if (!mInput.tfIDs.empty() && acceptTF) { acceptTF = false; + while ((mInput.tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.tfIDs.size()) { + mSelIDEntry++; + } + LOGP(info, "chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.tfIDs[mSelIDEntry], mTFBuilderCounter); if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) { mWaitSendingLast = false; acceptTF = true; LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry); - mSelIDEntry++; } else { LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter); } - } else { - mSelIDEntry++; } mTFBuilderCounter++; } if (mRunning && tf) { if (acceptTF) { + mAccTFCounter++; mWaitSendingLast = true; mTFQueue.push(std::move(tf)); }