Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Detectors/CTF/workflow/src/CTFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CTFReaderSpec : public o2::framework::Task
int mCTFCounterAcc = 0;
int mNFailedFiles = 0;
int mFilesRead = 0;
int mTFLength = 128;
int mTFLength = 32;
int mNWaits = 0;
int mRunNumberPrev = -1;
long mTotalWaitTime = 0;
Expand Down Expand Up @@ -234,7 +234,7 @@ void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
{
std::ifstream inputFile(flname);
if (!inputFile) {
LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans);
LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
}
std::string line;
size_t cntl = 0, cntr = 0;
Expand Down Expand Up @@ -286,7 +286,7 @@ void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
logError();
}
}
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans);
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
inputFile.close();
}

Expand Down
1 change: 1 addition & 0 deletions Detectors/Raw/TFReaderDD/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ o2_add_library(TFReaderDD
O2::Headers
O2::Framework
O2::DetectorsRaw
O2::DataFormatsParameters
O2::CommonUtils
O2::Algorithm
FairMQ::FairMQ)
Expand Down
142 changes: 137 additions & 5 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@
#include "TFReaderSpec.h"
#include "TFReaderDD/SubTimeFrameFileReader.h"
#include "TFReaderDD/SubTimeFrameFile.h"
#include "CommonUtils/StringUtils.h"
#include "CommonUtils/FileFetcher.h"
#include "CommonUtils/FIFO.h"
#include "CommonUtils/IRFrameSelector.h"
#include "DataFormatsParameters/AggregatedRunInfo.h"
#include "CCDB/BasicCCDBManager.h"
#include "CommonConstants/LHCConstants.h"
#include "Algorithm/RangeTokenizer.h"
#include <unistd.h>
#include <algorithm>
#include <unordered_map>
Expand Down Expand Up @@ -66,6 +72,8 @@ class TFReaderSpec : public o2f::Task
void endOfStream(o2f::EndOfStreamContext& ec) final;

private:
void loadRunTimeSpans(const std::string& flname);
void runTimeRangesToIRFrameSelector(int runNumber);
void stopProcessing(o2f::ProcessingContext& ctx);
void TFBuilder();

Expand All @@ -76,9 +84,13 @@ class TFReaderSpec : public o2f::Task
o2::utils::FIFO<std::unique_ptr<TFMap>> mTFQueue{}; // queued TFs
// std::unordered_map<o2h::DataIdentifier, SubSpecCount, std::hash<o2h::DataIdentifier>> mSeenOutputMap;
std::unordered_map<o2h::DataIdentifier, SubSpecCount> mSeenOutputMap;
std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
int mConvRunTimeRangesToOrbits = -1; // not defined yet
int mTFCounter = 0;
int mTFBuilderCounter = 0;
int mNWaits = 0;
int mTFLength = 32;
long mTotalWaitTime = 0;
size_t mSelIDEntry = 0; // next TFID to select from the mInput.tfIDs (if non-empty)
bool mRunning = false;
Expand All @@ -105,6 +117,9 @@ void TFReaderSpec::init(o2f::InitContext& ic)
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
if (!mInput.fileRunTimeSpans.empty()) {
loadRunTimeSpans(mInput.fileRunTimeSpans);
}
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
mFileFetcher->setMaxLoops(mInput.maxLoops);
Expand Down Expand Up @@ -142,10 +157,6 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
if (verbose && mInput.verbosity > 0) {
LOGP(info, "Acknowledge: part {}/{} {}/{}/{:#x} size:{} split {}/{}", ip, np, hd->dataOrigin.as<std::string>(), hd->dataDescription.as<std::string>(), hd->subSpecification, msgh.GetSize() + parts[ip + 1].GetSize(), hd->splitPayloadIndex, hd->splitPayloadParts);
}
if (dph->startTime != this->mTFCounter) {
LOGP(fatal, "Local tf counter {} != TF timeslice {} for {}", this->mTFCounter, dph->startTime,
o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{hd->dataOrigin, hd->dataDescription, hd->subSpecification}));
}
if (hd->splitPayloadIndex == 0) { // check the 1st one only
auto& entry = this->mSeenOutputMap[{hd->dataDescription.str, hd->dataOrigin.str}];
if (entry.count != this->mTFCounter) {
Expand Down Expand Up @@ -412,8 +423,25 @@ void TFReaderSpec::TFBuilder()
auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mSelIDEntry, mInput.sup0xccdb, mInput.verbosity);
bool acceptTF = true;
if (tf) {
if (mRunTimeRanges.size()) {
const auto* dataptr = (*tf->begin()->second.get())[0].GetData();
const auto* hd0 = o2h::get<o2h::DataHeader*>(dataptr);
static int runNumberPrev = -1;
if (runNumberPrev != hd0->runNumber) {
runNumberPrev = hd0->runNumber;
runTimeRangesToIRFrameSelector(runNumberPrev);
}
if (mIRFrameSelector.isSet()) {
o2::InteractionRecord ir0(0, hd0->firstTForbit);
o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, hd0->firstTForbit < 0xffffffff - (mTFLength - 1) ? hd0->firstTForbit + (mTFLength - 1) : 0xffffffff);
auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
acceptTF = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
irSpan.size(), ir0.asString(), ir1.asString(), acceptTF ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
}
}
locID++;
if (!mInput.tfIDs.empty()) {
if (!mInput.tfIDs.empty() && acceptTF) {
acceptTF = false;
if (mInput.tfIDs[mSelIDEntry] == mTFBuilderCounter) {
mWaitSendingLast = false;
Expand Down Expand Up @@ -448,6 +476,110 @@ void TFReaderSpec::TFBuilder()
}
}

//_________________________________________________________
void TFReaderSpec::loadRunTimeSpans(const std::string& flname)
{
std::ifstream inputFile(flname);
if (!inputFile) {
LOGP(fatal, "Failed to open selected run/timespans file {}", flname);
}
std::string line;
size_t cntl = 0, cntr = 0;
while (std::getline(inputFile, line)) {
cntl++;
for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
if (ch == ';' || ch == '\t' || ch == ',') {
ch = ' ';
}
}
o2::utils::Str::trim(line);
if (line.size() < 1 || line[0] == '#') {
continue;
}
auto tokens = o2::utils::Str::tokenize(line, ' ');
auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
if (tokens.size() >= 3) {
int run = 0;
long rmin, rmax;
try {
run = std::stoi(tokens[0]);
rmin = std::stol(tokens[1]);
rmax = std::stol(tokens[2]);
} catch (...) {
logError();
continue;
}

constexpr long ISTimeStamp = 1514761200000L;
int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
if (rmin > rmax) {
LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
}
if (mConvRunTimeRangesToOrbits == -1) {
if (convmn != convmx) {
LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
}
mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
} else {
if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
}
}

mRunTimeRanges[run].emplace_back(rmin, rmax);
cntr++;
} else {
logError();
}
}
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), flname);
inputFile.close();
}

//_________________________________________________________
void TFReaderSpec::runTimeRangesToIRFrameSelector(int runNumber)
{
// convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
mIRFrameSelector.clear();
auto ent = mRunTimeRanges.find(runNumber);
if (ent == mRunTimeRanges.end()) {
LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", runNumber);
return;
}
o2::parameters::AggregatedRunInfo rinfo;
auto& ccdb = o2::ccdb::BasicCCDBManager::instance();
rinfo = o2::parameters::AggregatedRunInfo::buildAggregatedRunInfo(ccdb, runNumber);
if (rinfo.runNumber != runNumber || rinfo.orbitsPerTF < 1) {
LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", runNumber);
}
mTFLength = rinfo.orbitsPerTF;
std::vector<o2::dataformats::IRFrame> frames;
for (const auto& rng : ent->second) {
long orbMin = 0, orbMax = 0;
if (mConvRunTimeRangesToOrbits > 0) {
orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
} else {
orbMin = rng.first;
orbMax = rng.second;
}
if (orbMin < 0) {
orbMin = 0;
}
if (orbMax < 0) {
orbMax = 0;
}
if (runNumber > 523897) {
orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
}
LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
frames.emplace_back(o2::InteractionRecord{0, uint32_t(orbMin)}, o2::InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
}
mIRFrameSelector.setOwnList(frames, true);
}

//_________________________________________________________
o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
{
Expand Down
2 changes: 2 additions & 0 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct TFReaderInp {
std::string tffileRegex{};
std::string remoteRegex{};
std::string metricChannel{};
std::string fileRunTimeSpans{};
o2::detectors::DetID::mask_t detMask{};
o2::detectors::DetID::mask_t detMaskRawOnly{};
o2::detectors::DetID::mask_t detMaskNonRawOnly{};
Expand All @@ -46,6 +47,7 @@ struct TFReaderInp {
int maxTFsPerFile = -1;
bool sendDummyForMissing = true;
bool sup0xccdb = false;
bool invertIRFramesSelection = false;
std::vector<o2::header::DataHeader> hdVec;
std::vector<int> tfIDs{};
};
Expand Down
5 changes: 4 additions & 1 deletion Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});
options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});

// options for error-check suppression
Expand Down Expand Up @@ -80,7 +82,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
rinp.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
}

rinp.fileRunTimeSpans = configcontext.options().get<std::string>("run-time-span-file");
rinp.invertIRFramesSelection = configcontext.options().get<bool>("invert-irframe-selection");
WorkflowSpec specs;
specs.emplace_back(o2::rawdd::getTFReaderSpec(rinp));
return specs;
Expand Down