From 88aa425e68027f083dbde4446e055a8da86e290b Mon Sep 17 00:00:00 2001 From: aferrero2707 Date: Tue, 12 Aug 2025 18:13:02 +0200 Subject: [PATCH] [MCH] improvements to the pedestal calibrator The pedestal processing code is modified to run in multi-threaded mode, to improve the processing speed on the calibrator node The periodic logging of pedestals statistics is also improved, to better monitor and debug the pedestals data taking. --- .../MCHCalibration/BadChannelCalibrator.h | 6 +- .../BadChannelCalibratorParam.h | 2 + .../include/MCHCalibration/PedestalData.h | 5 +- .../src/BadChannelCalibrationDevice.cxx | 29 +++-- .../Calibration/src/BadChannelCalibrator.cxx | 39 +++++- .../MUON/MCH/Calibration/src/PedestalData.cxx | 121 ++++++++++++++---- .../MCH/Calibration/test/testPedestalData.cxx | 21 +++ 7 files changed, 179 insertions(+), 44 deletions(-) diff --git a/Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibrator.h b/Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibrator.h index 509b9b88b30e4..6873d340841a2 100644 --- a/Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibrator.h +++ b/Detectors/MUON/MCH/Calibration/include/MCHCalibration/BadChannelCalibrator.h @@ -68,13 +68,17 @@ class BadChannelCalibrator final : public o2::calibration::TimeSlotCalibration(); mCalibrator->setSlotLength(o2::calibration::INFINITE_TF); mCalibrator->setUpdateAtTheEndOfRunOnly(); + mCalibrator->setLoggingInterval(mLoggingInterval); mTimeStamp = std::numeric_limits::max(); } @@ -52,6 +53,8 @@ void BadChannelCalibrationDevice::logStats(size_t dataSize) static auto loggerEnd = loggerStart; static size_t nDigits = 0; static size_t nTF = 0; + static size_t nTFtot = 0; + static size_t nTFtotWithData = 0; if (mLoggingInterval == 0) { return; @@ -59,11 +62,17 @@ void BadChannelCalibrationDevice::logStats(size_t dataSize) nDigits += dataSize; nTF += 1; + nTFtot += 1; + if (dataSize > 1000) { + nTFtotWithData += 1; + } loggerEnd = std::chrono::high_resolution_clock::now(); std::chrono::duration loggerElapsed = loggerEnd - loggerStart; - if (loggerElapsed.count() > 1000) { - LOG(info) << "received " << nDigits << " digits in " << nTF << " time frames"; + if (loggerElapsed.count() > mLoggingInterval) { + LOG(warning) << "received " << nDigits << " digits in " << nTF << " time frames"; + LOG(warning) << "received " << nTFtotWithData << " time frames with data out of " << nTFtot << " total time frames (" + << ((nTFtot > 0) ? (nTFtotWithData * 100.0) / nTFtot : 0.0) << "%)"; nDigits = 0; nTF = 0; loggerStart = std::chrono::high_resolution_clock::now(); @@ -86,7 +95,7 @@ void BadChannelCalibrationDevice::run(o2::framework::ProcessingContext& pc) std::string reason; if (mCalibrator->readyToSend(reason)) { mHasEnoughStat = true; - LOGP(info, "We're ready to send output to CCDB ({})", reason); + LOGP(warning, "We're ready to send output to CCDB ({})", reason); sendOutput(pc.outputs(), reason); mSkipData = true; } @@ -139,12 +148,12 @@ void sendCalibrationOutput(o2::framework::DataAllocator& output, using clbUtils = o2::calibration::Utils; auto image = o2::ccdb::CcdbApi::createObjectImage(payload, payloadInfo); - LOG(info) << "Sending object " << payloadInfo->getPath() - << " of type" << payloadInfo->getObjectType() - << " /" << payloadInfo->getFileName() - << " of size " << image->size() - << " bytes, valid for " << payloadInfo->getStartValidityTimestamp() - << " : " << payloadInfo->getEndValidityTimestamp(); + LOG(warning) << "Sending object " << payloadInfo->getPath() + << " of type" << payloadInfo->getObjectType() + << " /" << payloadInfo->getFileName() + << " of size " << image->size() + << " bytes, valid for " << payloadInfo->getStartValidityTimestamp() + << " : " << payloadInfo->getEndValidityTimestamp(); output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBPayload, "MCH_BADCHAN", subSpec}, *image.get()); output.snapshot(o2::framework::Output{o2::calibration::Utils::gDataOriginCDBWrapper, "MCH_BADCHAN", subSpec}, *payloadInfo); @@ -168,7 +177,7 @@ void BadChannelCalibrationDevice::sendOutput(o2::framework::DataAllocator& outpu reason_with_entries = fmt::format("{} ; no entries", reason); } - LOGP(info, "sendOutput: {}", reason_with_entries); + LOGP(warning, "sendOutput: {}", reason_with_entries); mCalibrator->finalize(); // the bad channels table is only updated if there is enough statistics diff --git a/Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx b/Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx index 26d312e7dc36e..b5aa17ef81f8c 100644 --- a/Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx +++ b/Detectors/MUON/MCH/Calibration/src/BadChannelCalibrator.cxx @@ -20,6 +20,7 @@ #include #include #include +#include #include namespace o2::mch::calibration @@ -65,6 +66,8 @@ void BadChannelCalibrator::finalize() bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const { + static auto loggerStart = std::chrono::high_resolution_clock::now(); + static auto loggerEnd = loggerStart; const int minNofEntries = BadChannelCalibratorParam::Instance().minRequiredNofEntriesPerChannel; const o2::mch::calibration::PedestalData* pedData = slot.getContainer(); auto nofChannels = pedData->size(); @@ -75,9 +78,35 @@ bool BadChannelCalibrator::hasEnoughData(const Slot& slot) const bool hasEnough = nofCalibrated > requiredChannels; - LOGP(info, - "nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}", - minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough); + // logging of calibration statistics + loggerEnd = std::chrono::high_resolution_clock::now(); + std::chrono::duration loggerElapsed = loggerEnd - loggerStart; + if (mLoggingInterval > 0 && loggerElapsed.count() > mLoggingInterval) { + int minEntriesPerChannel{std::numeric_limits::max()}; + int maxEntriesPerChannel{0}; + uint64_t averageEntriesPerChannel = 0; + std::for_each(pedData->cbegin(), pedData->cend(), + [&](const PedestalChannel& c) { + if (c.mEntries == 0) { + return; + } + if (c.mEntries > maxEntriesPerChannel) { + maxEntriesPerChannel = c.mEntries; + } + if (c.mEntries < minEntriesPerChannel) { + minEntriesPerChannel = c.mEntries; + } + averageEntriesPerChannel += c.mEntries; + }); + if (nofChannels > 0) { + averageEntriesPerChannel /= nofChannels; + } + LOGP(warning, "channel stats: min={} max={} average={}", minEntriesPerChannel, maxEntriesPerChannel, averageEntriesPerChannel); + LOGP(warning, + "nofChannelWithEnoughStat(>{})={} nofChannels={} requiredChannels={} hasEnough={}", + minNofEntries, nofCalibrated, nofChannels, requiredChannels, hasEnough); + loggerStart = std::chrono::high_resolution_clock::now(); + } return hasEnough; } @@ -92,7 +121,7 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot) mBadChannelsVector.clear(); o2::mch::calibration::PedestalData* pedestalData = slot.getContainer(); - LOG(info) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd(); + LOG(warning) << "Finalize slot " << slot.getTFStart() << " <= TF <= " << slot.getTFEnd(); // keep track of first TimeFrame if (slot.getTFStart() < mTFStart) { @@ -120,9 +149,11 @@ void BadChannelCalibrator::finalizeSlot(Slot& slot) BadChannelCalibrator::Slot& BadChannelCalibrator::emplaceNewSlot(bool front, TFType tstart, TFType tend) { + const int nThreads = static_cast(BadChannelCalibratorParam::Instance().nThreads); auto& cont = getSlots(); auto& slot = front ? cont.emplace_front(tstart, tend) : cont.emplace_back(tstart, tend); slot.setContainer(std::make_unique()); + slot.getContainer()->setNThreads(nThreads); return slot; } diff --git a/Detectors/MUON/MCH/Calibration/src/PedestalData.cxx b/Detectors/MUON/MCH/Calibration/src/PedestalData.cxx index 661bab7913b8e..5947cc940e3ce 100644 --- a/Detectors/MUON/MCH/Calibration/src/PedestalData.cxx +++ b/Detectors/MUON/MCH/Calibration/src/PedestalData.cxx @@ -19,6 +19,9 @@ #include #include #include +#include +#include +#include namespace o2::mch::calibration { @@ -69,45 +72,107 @@ PedestalData::PedestalMatrix PedestalData::initPedestalMatrix(uint16_t solarId) void PedestalData::fill(gsl::span digits) { bool mDebug = false; + static std::mutex pedestalMutex; + static std::set solarIds = o2::mch::raw::getSolarUIDs(); - for (auto& d : digits) { - uint16_t solarId = d.getSolarId(); - uint8_t dsId = d.getDsId(); - uint8_t channel = d.getChannel(); + if (digits.empty()) { + return; + } - auto iPedestal = mPedestals.find(solarId); + LOGP(info, "processing {} digits with {} threads", (int)digits.size(), mNThreads); - if (iPedestal == mPedestals.end()) { - auto iPedestalsNew = mPedestals.emplace(std::make_pair(solarId, initPedestalMatrix(solarId))); - iPedestal = iPedestalsNew.first; - } + // fill the queue of SOLAR IDs to be processed + std::queue solarQueue; + for (auto solarId : solarIds) { + solarQueue.push(solarId); + } - if (iPedestal == mPedestals.end()) { - LOGP(fatal, "failed to insert new element in padestals map"); - break; - } + auto processSolarDigits = [&]() { + while (true) { + int targetSolarId = -1; + PedestalsMap::iterator iPedestal; + bool pedestalsAreInitialized; + + // non thread-safe access to solarQueue, protected by the pedestalMutex + { + std::lock_guard lock(pedestalMutex); + + // stop when there are no mor SOLAR IDs to process + if (solarQueue.empty()) { + break; + } + + // get the next SOLAR ID to be processed + targetSolarId = solarQueue.front(); + solarQueue.pop(); + + // update the iterator to the pedestal data for the target SOLAR + iPedestal = mPedestals.find(targetSolarId); + if (iPedestal == mPedestals.end()) { + pedestalsAreInitialized = false; + } else { + pedestalsAreInitialized = true; + } + } - auto& ped = iPedestal->second[dsId][channel]; + // loop over digits, selecting only those belonging to the target SOLAR + for (auto& d : digits) { + uint16_t solarId = d.getSolarId(); + if (solarId != targetSolarId) { + continue; + } - for (uint16_t i = 0; i < d.nofSamples(); i++) { - auto s = d.getSample(i); + // non thread-safe access to Pedestals structure, protected by the pedestalMutex + if (!pedestalsAreInitialized) { + std::lock_guard lock(pedestalMutex); - ped.mEntries += 1; - uint64_t N = ped.mEntries; + // create the pedestals structure corresponding to the SOLAR ID to be processed + iPedestal = mPedestals.emplace(std::make_pair(targetSolarId, initPedestalMatrix(targetSolarId))).first; - double p0 = ped.mPedestal; - double p = p0 + (s - p0) / N; - ped.mPedestal = p; + if (iPedestal == mPedestals.end()) { + LOGP(fatal, "failed to insert new element in padestals map"); + break; + } + pedestalsAreInitialized = true; + } - double M0 = ped.mVariance; - double M = M0 + (s - p0) * (s - p); - ped.mVariance = M; - } + uint8_t dsId = d.getDsId(); + uint8_t channel = d.getChannel(); + + auto& ped = iPedestal->second[dsId][channel]; + + for (uint16_t i = 0; i < d.nofSamples(); i++) { + auto s = d.getSample(i); - if (mDebug) { - LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}", - (int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance); + ped.mEntries += 1; + uint64_t N = ped.mEntries; + + double p0 = ped.mPedestal; + double p = p0 + (s - p0) / N; + ped.mPedestal = p; + + double M0 = ped.mVariance; + double M = M0 + (s - p0) * (s - p); + ped.mVariance = M; + } + + if (mDebug) { + LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}", + (int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance); + } + } } + }; + + // process the digits in parallel threads + std::vector threads; + for (int ti = 0; ti < mNThreads; ti++) { + threads.emplace_back(processSolarDigits); + } + + // wait for all threads to finish processing + for (auto& thread : threads) { + thread.join(); } } diff --git a/Detectors/MUON/MCH/Calibration/test/testPedestalData.cxx b/Detectors/MUON/MCH/Calibration/test/testPedestalData.cxx index c61656aa7845f..0c1d6bffb984e 100644 --- a/Detectors/MUON/MCH/Calibration/test/testPedestalData.cxx +++ b/Detectors/MUON/MCH/Calibration/test/testPedestalData.cxx @@ -84,6 +84,17 @@ BOOST_AUTO_TEST_CASE(TestIteratorOnCompletePedestalData) ++n; } BOOST_TEST(n == allDigits.size()); + + // multi-threaded version + PedestalData pdmt; + pdmt.setNThreads(8); + pdmt.fill(allDigits); + + int nmt{0}; + for (const auto& ped : pdmt) { + ++nmt; + } + BOOST_TEST(nmt == allDigits.size()); } BOOST_AUTO_TEST_CASE(TestIteratorEquality) @@ -113,6 +124,16 @@ BOOST_AUTO_TEST_CASE(TestIteratorPreIncrementable) n++; } BOOST_TEST(n == 2768); + + // multi-threaded version + PedestalData pdmt; + pdmt.setNThreads(8); + pdmt.fill(digits); + int nmt{0}; + for (auto rec : pdmt) { + nmt++; + } + BOOST_TEST(nmt == 2768); // 2768 = 1856 pads in solar 328 + 721 pads in solar 721 // Note that solar 328 has 29 dual sampas // solar 721 has 15 dual sampas