diff --git a/Detectors/CTF/workflow/src/CTFReaderSpec.cxx b/Detectors/CTF/workflow/src/CTFReaderSpec.cxx index e502b88611a3c..3810230637e5f 100644 --- a/Detectors/CTF/workflow/src/CTFReaderSpec.cxx +++ b/Detectors/CTF/workflow/src/CTFReaderSpec.cxx @@ -112,7 +112,7 @@ class CTFReaderSpec : public o2::framework::Task int mCTFCounterAcc = 0; int mNFailedFiles = 0; int mFilesRead = 0; - int mTFLength = 128; + int mTFLength = 32; int mNWaits = 0; int mRunNumberPrev = -1; long mTotalWaitTime = 0; @@ -234,7 +234,7 @@ void CTFReaderSpec::loadRunTimeSpans(const std::string& flname) { std::ifstream inputFile(flname); if (!inputFile) { - LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans); + LOGP(fatal, "Failed to open selected run/timespans file {}", flname); } std::string line; size_t cntl = 0, cntr = 0; @@ -286,7 +286,7 @@ void CTFReaderSpec::loadRunTimeSpans(const std::string& flname) logError(); } } - LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans); + LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname); inputFile.close(); } diff --git a/Detectors/Raw/TFReaderDD/CMakeLists.txt b/Detectors/Raw/TFReaderDD/CMakeLists.txt index e58f0d50115f7..12ecc9ca8795d 100644 --- a/Detectors/Raw/TFReaderDD/CMakeLists.txt +++ b/Detectors/Raw/TFReaderDD/CMakeLists.txt @@ -16,6 +16,7 @@ o2_add_library(TFReaderDD O2::Headers O2::Framework O2::DetectorsRaw + O2::DataFormatsParameters O2::CommonUtils O2::Algorithm FairMQ::FairMQ) diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx index 07a62a7fd4a58..f4cd64377034e 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx @@ -31,8 +31,14 @@ #include "TFReaderSpec.h" #include "TFReaderDD/SubTimeFrameFileReader.h" #include "TFReaderDD/SubTimeFrameFile.h" +#include "CommonUtils/StringUtils.h" #include "CommonUtils/FileFetcher.h" #include "CommonUtils/FIFO.h" +#include "CommonUtils/IRFrameSelector.h" +#include "DataFormatsParameters/AggregatedRunInfo.h" +#include "CCDB/BasicCCDBManager.h" +#include "CommonConstants/LHCConstants.h" +#include "Algorithm/RangeTokenizer.h" #include #include #include @@ -66,6 +72,8 @@ class TFReaderSpec : public o2f::Task void endOfStream(o2f::EndOfStreamContext& ec) final; private: + void loadRunTimeSpans(const std::string& flname); + void runTimeRangesToIRFrameSelector(int runNumber); void stopProcessing(o2f::ProcessingContext& ctx); void TFBuilder(); @@ -76,9 +84,13 @@ class TFReaderSpec : public o2f::Task o2::utils::FIFO> mTFQueue{}; // queued TFs // std::unordered_map> mSeenOutputMap; std::unordered_map mSeenOutputMap; + std::map>> mRunTimeRanges; + o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector + int mConvRunTimeRangesToOrbits = -1; // not defined yet int mTFCounter = 0; int mTFBuilderCounter = 0; int mNWaits = 0; + int mTFLength = 32; long mTotalWaitTime = 0; size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty) bool mRunning = false; @@ -105,6 +117,9 @@ void TFReaderSpec::init(o2f::InitContext& ic) mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff; mInput.maxTFCache = std::max(1, ic.options().get("max-cached-tf")); mInput.maxFileCache = std::max(1, ic.options().get("max-cached-files")); + if (!mInput.fileRunTimeSpans.empty()) { + loadRunTimeSpans(mInput.fileRunTimeSpans); + } mFileFetcher = std::make_unique(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd); mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache); mFileFetcher->setMaxLoops(mInput.maxLoops); @@ -142,10 +157,6 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) if (verbose && mInput.verbosity > 0) { LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as(), hd->dataDescription.as(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts); } - if (dph->startTime != this->mTFCounter) { - LOGP(fatal, "Local tf counter {} != TF timeslice {} for {}", this->mTFCounter, dph->startTime, - o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{hd->dataOrigin, hd->dataDescription, hd->subSpecification})); - } if (hd->splitPayloadIndex == 0) { // check the 1st one only auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}]; if (entry.count != this->mTFCounter) { @@ -412,8 +423,25 @@ void TFReaderSpec::TFBuilder() auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity); bool acceptTF = true; if (tf) { + if (mRunTimeRanges.size()) { + const auto* dataptr = (*tf->begin()->second.get())[0].GetData(); + const auto* hd0 = o2h::get(dataptr); + static int runNumberPrev = -1; + if (runNumberPrev != hd0->runNumber) { + runNumberPrev = hd0->runNumber; + runTimeRangesToIRFrameSelector(runNumberPrev); + } + if (mIRFrameSelector.isSet()) { + o2::InteractionRecord ir0(0, hd0->firstTForbit); + o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, hd0->firstTForbit < 0xffffffff - (mTFLength - 1) ? hd0->firstTForbit + (mTFLength - 1) : 0xffffffff); + auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1}); + acceptTF = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection; + LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})", + irSpan.size(), ir0.asString(), ir1.asString(), acceptTF ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF"); + } + } locID++; - if (!mInput.tfIDs.empty()) { + if (!mInput.tfIDs.empty() && acceptTF) { acceptTF = false; if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) { mWaitSendingLast = false; @@ -448,6 +476,110 @@ void TFReaderSpec::TFBuilder() } } +//_________________________________________________________ +void TFReaderSpec::loadRunTimeSpans(const std::string& flname) +{ + std::ifstream inputFile(flname); + if (!inputFile) { + LOGP(fatal, "Failed to open selected run/timespans file {}", flname); + } + std::string line; + size_t cntl = 0, cntr = 0; + while (std::getline(inputFile, line)) { + cntl++; + for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing + if (ch == ';' || ch == '\t' || ch == ',') { + ch = ' '; + } + } + o2::utils::Str::trim(line); + if (line.size() < 1 || line[0] == '#') { + continue; + } + auto tokens = o2::utils::Str::tokenize(line, ' '); + auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet , failed on line#{}: {}", cntl, line); }; + if (tokens.size() >= 3) { + int run = 0; + long rmin, rmax; + try { + run = std::stoi(tokens[0]); + rmin = std::stol(tokens[1]); + rmax = std::stol(tokens[2]); + } catch (...) { + logError(); + continue; + } + + constexpr long ISTimeStamp = 1514761200000L; + int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits) + if (rmin > rmax) { + LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line); + } + if (mConvRunTimeRangesToOrbits == -1) { + if (convmn != convmx) { + LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line); + } + mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time + LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits"); + } else { + if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) { + LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line); + } + } + + mRunTimeRanges[run].emplace_back(rmin, rmax); + cntr++; + } else { + logError(); + } + } + LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname); + inputFile.close(); +} + +//_________________________________________________________ +void TFReaderSpec::runTimeRangesToIRFrameSelector(int runNumber) +{ + // convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit + mIRFrameSelector.clear(); + auto ent = mRunTimeRanges.find(runNumber); + if (ent == mRunTimeRanges.end()) { + LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber); + return; + } + o2::parameters::AggregatedRunInfo rinfo; + auto& ccdb = o2::ccdb::BasicCCDBManager::instance(); + rinfo = o2::parameters::AggregatedRunInfo::buildAggregatedRunInfo(ccdb, runNumber); + if (rinfo.runNumber != runNumber || rinfo.orbitsPerTF < 1) { + LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", runNumber); + } + mTFLength = rinfo.orbitsPerTF; + std::vector frames; + for (const auto& rng : ent->second) { + long orbMin = 0, orbMax = 0; + if (mConvRunTimeRangesToOrbits > 0) { + orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001); + orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001); + } else { + orbMin = rng.first; + orbMax = rng.second; + } + if (orbMin < 0) { + orbMin = 0; + } + if (orbMax < 0) { + orbMax = 0; + } + if (runNumber > 523897) { + orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF; + orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1; + } + LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected"); + frames.emplace_back(o2::InteractionRecord{0, uint32_t(orbMin)}, o2::InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)}); + } + mIRFrameSelector.setOwnList(frames, true); +} + //_________________________________________________________ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp) { diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h index e3a5b5c920010..9db18768c1bfe 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h @@ -32,6 +32,7 @@ struct TFReaderInp { std::string tffileRegex{}; std::string remoteRegex{}; std::string metricChannel{}; + std::string fileRunTimeSpans{}; o2::detectors::DetID::mask_t detMask{}; o2::detectors::DetID::mask_t detMaskRawOnly{}; o2::detectors::DetID::mask_t detMaskNonRawOnly{}; @@ -46,6 +47,7 @@ struct TFReaderInp { int maxTFsPerFile = -1; bool sendDummyForMissing = true; bool sup0xccdb = false; + bool invertIRFramesSelection = false; std::vector hdVec; std::vector tfIDs{}; }; diff --git a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx index 7d8ee09fe474f..bc682127b0d3f 100644 --- a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx +++ b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx @@ -39,6 +39,8 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}}); options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}}); options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}}); + options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}}); + options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}}); options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}}); // options for error-check suppression @@ -80,7 +82,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext) if (rateLimitingIPCID > -1 && !chanFmt.empty()) { rinp.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID); } - + rinp.fileRunTimeSpans = configcontext.options().get("run-time-span-file"); + rinp.invertIRFramesSelection = configcontext.options().get("invert-irframe-selection"); WorkflowSpec specs; specs.emplace_back(o2::rawdd::getTFReaderSpec(rinp)); return specs;