diff --git a/Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h b/Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h index b202013a6eea1..ab03649c0646b 100644 --- a/Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h +++ b/Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h @@ -33,6 +33,7 @@ struct CTFReaderInp { std::string fileIRFrames{}; std::string fileRunTimeSpans{}; std::vector ctfIDs{}; + bool reverseCTFIDs{false}; bool skipSkimmedOutTF = false; bool invertIRFramesSelection = false; bool allowMissingDetectors = false; @@ -47,6 +48,7 @@ struct CTFReaderInp { unsigned int decSSpecEMC = 0; int tfRateLimit = -999; size_t minSHM = 0; + bool shuffle{false}; }; /// create a processor spec diff --git a/Detectors/CTF/workflow/src/CTFReaderSpec.cxx b/Detectors/CTF/workflow/src/CTFReaderSpec.cxx index aadc059ecd4fa..e502b88611a3c 100644 --- a/Detectors/CTF/workflow/src/CTFReaderSpec.cxx +++ b/Detectors/CTF/workflow/src/CTFReaderSpec.cxx @@ -11,9 +11,14 @@ /// @file CTFReaderSpec.cxx +#include #include +#include +#include + #include #include +#include #include "Framework/Logger.h" #include "Framework/ControlService.h" @@ -49,7 +54,6 @@ #include "CCDB/BasicCCDBManager.h" #include "CommonConstants/LHCConstants.h" #include "Algorithm/RangeTokenizer.h" -#include #include using namespace o2::framework; @@ -155,6 +159,9 @@ void CTFReaderSpec::stopReader() void CTFReaderSpec::init(InitContext& ic) { mInput.ctfIDs = o2::RangeTokenizer::tokenize(ic.options().get("select-ctf-ids")); + if ((mInput.reverseCTFIDs = ic.options().get("reverse-select-ctf-ids"))) { + std::reverse(mInput.ctfIDs.begin(), mInput.ctfIDs.end()); + } mUseLocalTFCounter = ic.options().get("local-tf-counter"); mImposeRunStartMS = ic.options().get("impose-run-start-timstamp"); mInput.checkTFLimitBeforeReading = ic.options().get("limit-tf-before-reading"); @@ -299,6 +306,27 @@ void CTFReaderSpec::openCTFFile(const std::string& flname) if (mCTFTree->GetEntries() < 1) { throw std::runtime_error(fmt::format("CTF tree in {} has 0 entries, skipping", flname)); } + if (mInput.shuffle) { + if (mInput.ctfIDs.empty()) { + int entries = mCTFTree->GetEntries(); + if (mInput.maxTFs > 0) { + entries = std::min(entries, mInput.maxTFs); + } + if (mInput.maxTFsPerFile > 0) { + entries = std::min(entries, mInput.maxTFsPerFile); + } + mInput.ctfIDs.clear(); + mInput.ctfIDs.resize(entries); + std::iota(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), 0); + } + std::random_device dev; + std::mt19937 gen{dev()}; + std::shuffle(mInput.ctfIDs.begin(), mInput.ctfIDs.end(), gen); + LOGP(info, "will shuffle reading of CTF entries in this order:"); + for (int i{0}; i < (int)mInput.ctfIDs.size(); ++i) { + LOGP(info, "\tTF {:02} -> {:02}", i, mInput.ctfIDs[i]); + } + } } catch (const std::exception& e) { LOG(error) << "Cannot process " << flname << ", reason: " << e.what(); mCTFTree.reset(); @@ -322,9 +350,12 @@ void CTFReaderSpec::run(ProcessingContext& pc) long startWait = 0; while (mRunning) { - if (mCTFTree) { // there is a tree open with multiple CTF - if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter) { // no selection requested or matching CTF ID is found + if (mCTFTree) { // there is a tree open with multiple CTF + if (mInput.ctfIDs.empty() || mInput.ctfIDs[mSelIDEntry] == mCTFCounter || mInput.shuffle || mInput.reverseCTFIDs) { // no selection requested or matching CTF ID is found LOG(debug) << "TF " << mCTFCounter << " of " << mInput.maxTFs << " loop " << mFileFetcher->getNLoops(); + if (mInput.shuffle || mInput.reverseCTFIDs) { + mCurrTreeEntry = mInput.ctfIDs[mSelIDEntry]; + } mSelIDEntry++; if (processTF(pc)) { break; @@ -500,8 +531,13 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc) ///_______________________________________ void CTFReaderSpec::checkTreeEntries() { - // check if the tree has entries left, if needed, close current tree/file - if (++mCurrTreeEntry >= mCTFTree->GetEntries() || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files + bool reachedEnd{false}; + if (mInput.shuffle || mInput.reverseCTFIDs) { // last entry is last id + reachedEnd = (mCurrTreeEntry == mInput.ctfIDs.back()); + } else { // check if the tree has entries left, if needed, close current tree/file + reachedEnd = (++mCurrTreeEntry >= mCTFTree->GetEntries()); + } + if (reachedEnd || (mInput.maxTFsPerFile > 0 && mCurrTreeEntry >= mInput.maxTFsPerFile)) { // this file is done, check if there are other files mCTFTree.reset(); mCTFFile->Close(); mCTFFile.reset(); @@ -611,6 +647,7 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp) } options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}}); + options.emplace_back(ConfigParamSpec{"reverse-select-ctf-ids", VariantType::Bool, false, {"reverse order of to inject CTF IDs"}}); options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}}); options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}}); options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}}); diff --git a/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx b/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx index 1f0ef9a3b871b..cddf694251a01 100644 --- a/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx +++ b/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx @@ -56,6 +56,7 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}}); options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}}); options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}}); + options.push_back(ConfigParamSpec{"shuffle", VariantType::Bool, false, {"shuffle TF sending order (for debug)"}}); options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}}); options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access @@ -120,6 +121,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext) ctfInput.maxFileCache = std::max(1, configcontext.options().get("max-cached-files")); + ctfInput.shuffle = configcontext.options().get("shuffle"); ctfInput.copyCmd = configcontext.options().get("copy-cmd"); ctfInput.tffileRegex = configcontext.options().get("ctf-file-regex"); ctfInput.remoteRegex = configcontext.options().get("remote-regex");