diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index a289980349924..94764571840f4 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -288,63 +288,85 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) { return; } + struct ResourceState { + int64_t available; + int64_t offered = 0; + int64_t lastDeviceOffered = 0; + }; + struct ResourceStats { + int64_t enoughCount; + int64_t lowCount; + }; + struct ResourceSpec{ + int64_t maxAvailable; + int64_t maxQuantum; + int64_t minQuantum; + }; + static const ResourceSpec resourceSpec{ + .maxAvailable = (int64_t)calculateAvailableSharedMemory(registry), + .maxQuantum = 100, + .minQuantum = 50, + }; + static ResourceState resourceState{ + .available = resourceSpec.maxAvailable, + }; + static ResourceStats resourceStats{ + .enoughCount = resourceState.available - resourceSpec.minQuantum > 0 ? 1 : 0, + .lowCount = resourceState.available - resourceSpec.minQuantum > 0 ? 0 : 1 + }; - static int64_t MAX_SHARED_MEMORY = calculateAvailableSharedMemory(registry); - constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100; - constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50; - - static int64_t availableSharedMemory = MAX_SHARED_MEMORY; - static int64_t offeredSharedMemory = 0; - static int64_t lastDeviceOffered = 0; /// We loop over the devices, starting from where we stopped last time /// offering MIN_QUANTUM_SHARED_MEMORY of shared memory to each reader. int64_t lastCandidate = -1; - static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0; - static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1; - int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY; + int64_t possibleOffer = resourceSpec.minQuantum; + for (size_t di = 0; di < specs.size(); di++) { - if (availableSharedMemory < possibleOffer) { - if (lowSharedMemoryCount == 0) { + if (resourceState.available < possibleOffer) { + if (resourceStats.lowCount == 0) { O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough", "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu", - availableSharedMemory, possibleOffer, offeredSharedMemory); + resourceState.available, possibleOffer, resourceState.offered); } - lowSharedMemoryCount++; - enoughSharedMemoryCount = 0; + resourceStats.lowCount++; + resourceStats.enoughCount = 0; break; } else { - if (enoughSharedMemoryCount == 0) { + if (resourceStats.enoughCount == 0) { O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough", - "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory); + "We are back in a state where we enough shared memory: %{bytes}llu MB", resourceState.available); } - enoughSharedMemoryCount++; - lowSharedMemoryCount = 0; + resourceStats.lowCount = 0; + resourceStats.enoughCount++; } - size_t candidate = (lastDeviceOffered + di) % specs.size(); + size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size(); auto& info = infos[candidate]; // Do not bother for inactive devices // FIXME: there is probably a race condition if the device died and we did not // took notice yet... if (info.active == false || info.readyToQuit) { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Device %s is inactive not offering memory to it.", specs[candidate].name.c_str()); continue; } if (specs[candidate].name != "internal-dpl-aod-reader") { + O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", + "Device %s is not a reader. Not offering memory to it.", specs[candidate].name.c_str()); continue; } - possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory); + possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available); O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s", - possibleOffer, availableSharedMemory, specs[candidate].id.c_str()); + possibleOffer, resourceState.available, specs[candidate].id.c_str()); manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data()); - availableSharedMemory -= possibleOffer; - offeredSharedMemory += possibleOffer; + resourceState.available -= possibleOffer; + resourceState.offered += possibleOffer; lastCandidate = candidate; } // We had at least a valid candidate, so // next time we offer to the next device. if (lastCandidate >= 0) { - lastDeviceOffered = lastCandidate + 1; + resourceState.lastDeviceOffered = lastCandidate + 1; } // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was @@ -357,21 +379,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed); lastShmOfferConsumed = shmOfferBytesConsumed; } - int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000); + int unusedOfferedMemory = (resourceState.offered - (totalBytesExpired + shmOfferBytesConsumed) / 1000000); if (lastUnusedOfferedMemory != unusedOfferedMemory) { O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer", "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000", - unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); + unusedOfferedMemory, resourceState.offered, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000); lastUnusedOfferedMemory = unusedOfferedMemory; } // availableSharedMemory is the amount of memory which we know is available to be offered. // We subtract the amount which we know was already offered but it's unused and we then balance how // much was created with how much was destroyed. - availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory; - availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp); + resourceState.available = resourceSpec.maxAvailable + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory; + availableSharedMemoryMetric(driverMetrics, resourceState.available, timestamp); unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp); - offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); }, + offeredSharedMemoryMetric(driverMetrics, resourceState.offered, timestamp); }, .postDispatching = [](ProcessingContext& ctx, void* service) { using DataHeader = o2::header::DataHeader; auto* arrow = reinterpret_cast(service);