Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class TFReaderSpec : public o2f::Task
std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
int mConvRunTimeRangesToOrbits = -1; // not defined yet
int mTFCounter = 0;
int mSentTFCounter = 0;
int mAccTFCounter = 0;
int mTFBuilderCounter = 0;
int mNWaits = 0;
int mTFLength = 32;
Expand Down Expand Up @@ -159,15 +160,15 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
}
if (hd->splitPayloadIndex == 0) { // check the 1st one only
auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
if (entry.count != this->mTFCounter) {
if (entry.count != this->mSentTFCounter) {
if (verbose && hdPrev) { // report previous partition size
LOGP(info, "Block:{} {}/{} with size {}", nblocks, hdPrev->dataOrigin.as<std::string>(), hdPrev->dataDescription.as<std::string>(), dsize);
}
dsizeTot += dsize;
dsize = 0;
entry.count = this->mTFCounter; // acknowledge identifier seen in the data
entry.count = this->mSentTFCounter; // acknowledge identifier seen in the data
LOG(debug) << "Found a part " << ip << " of " << np << " | " << hd->dataOrigin.as<std::string>() << "/" << hd->dataDescription.as<std::string>()
<< "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mTFCounter;
<< "/" << hd->subSpecification << " part " << hd->splitPayloadIndex << " of " << hd->splitPayloadParts << " for TF " << this->mSentTFCounter;
nblocks++;
}
}
Expand Down Expand Up @@ -219,11 +220,11 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
const auto* dph = o2h::get<o2f::DataProcessingHeader*>(dataptr);
for (auto& out : this->mSeenOutputMap) {
if (out.second.count == this->mTFCounter) { // was seen in the data
if (out.second.count == this->mSentTFCounter) { // was seen in the data
continue;
}
LOG(debug) << "Adding dummy output for " << out.first.dataOrigin.as<std::string>() << "/" << out.first.dataDescription.as<std::string>()
<< "/" << out.second.defSubSpec << " for TF " << this->mTFCounter;
<< "/" << out.second.defSubSpec << " for TF " << this->mSentTFCounter;
o2h::DataHeader outHeader(out.first.dataDescription, out.first.dataOrigin, out.second.defSubSpec, 0);
outHeader.payloadSerializationMethod = o2h::gSerializationMethodNone;
outHeader.firstTForbit = hd0->firstTForbit;
Expand Down Expand Up @@ -270,7 +271,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)

auto tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
auto tDiff = tNow - tLastTF;
if (mTFCounter && tDiff < mInput.delay_us) {
if (mSentTFCounter && tDiff < mInput.delay_us) {
std::this_thread::sleep_for(std::chrono::microseconds((size_t)(mInput.delay_us - tDiff))); // respect requested delay before sending
}
for (auto& msgIt : *tfPtr.get()) {
Expand All @@ -285,9 +286,9 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
// however this is a small enough hack for now.
ctx.services().get<o2f::MessageContext>().fakeDispatch();
tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mTFCounter, dataSize, nparts, mTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
LOGP(info, "Sent TF {} of size {} with {} parts, {:.4f} s elapsed from previous TF., WaitSending={}", mSentTFCounter, dataSize, nparts, mSentTFCounter ? double(tNow - tLastTF) * 1e-6 : 0., mWaitSendingLast);
tLastTF = tNow;
++mTFCounter;
++mSentTFCounter;

while (mTFQueue.size() == 0 && mWaitSendingLast) {
usleep(10000);
Expand All @@ -300,7 +301,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
}
// usleep(5000); // wait 5ms for new TF to be built
}
if (mTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
if (mSentTFCounter >= mInput.maxTFs || (!mTFQueue.size() && !mRunning)) { // done
stopProcessing(ctx);
}
}
Expand All @@ -325,7 +326,7 @@ void TFReaderSpec::stopProcessing(o2f::ProcessingContext& ctx)
return;
}
stopDone = true;
LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
LOGP(info, "{} TFs in {} loops were sent, spent {:.2} s in {} data waiting states", mSentTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
mRunning = false;
if (mFileFetcher) {
mFileFetcher->stop();
Expand Down Expand Up @@ -420,7 +421,7 @@ void TFReaderSpec::TFBuilder()
std::this_thread::sleep_for(sleepTime);
continue;
}
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity);
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity);
bool acceptTF = true;
if (tf) {
if (mRunTimeRanges.size()) {
Expand All @@ -443,21 +444,23 @@ void TFReaderSpec::TFBuilder()
locID++;
if (!mInput.tfIDs.empty() && acceptTF) {
acceptTF = false;
while ((mInput.tfIDs[mSelIDEntry] < mTFBuilderCounter) && (mSelIDEntry + 1) < mInput.tfIDs.size()) {
mSelIDEntry++;
}
LOGP(info, "chec if mInput.tfIDs[{}]({}) == {}", mSelIDEntry, mInput.tfIDs[mSelIDEntry], mTFBuilderCounter);
if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
mWaitSendingLast = false;
acceptTF = true;
LOGP(info, "Retrieved TF#{} will be pushed as slice {} following user request", mTFBuilderCounter, mSelIDEntry);
mSelIDEntry++;
} else {
LOGP(info, "Retrieved TF#{} will be discared following user request", mTFBuilderCounter);
}
} else {
mSelIDEntry++;
}
mTFBuilderCounter++;
}
if (mRunning && tf) {
if (acceptTF) {
mAccTFCounter++;
mWaitSendingLast = true;
mTFQueue.push(std::move(tf));
}
Expand Down