From 5945ae63a26459f3ea302143f65c62e8a51ec457 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 5 Nov 2025 15:52:22 +0200 Subject: [PATCH 1/2] storage: use single batch find for a fetch The fetch path had two batch finds, one for the DelayedFetch.tryCompleteDiskless and seconds at DelayedFetch.onComplete. Collect the batches at tryCompleteDiskless and reuse at onComplete. This reduces the database queries to batch coordinator. --- .../scala/kafka/server/DelayedFetch.scala | 98 ++++++----- .../scala/kafka/server/ReplicaManager.scala | 52 +++++- .../kafka/server/DelayedFetchTest.scala | 36 ++-- .../kafka/server/ReplicaManagerTest.scala | 3 +- .../aiven/inkless/consume/FetchHandler.java | 10 +- .../java/io/aiven/inkless/consume/Reader.java | 158 ++++++++---------- .../inkless/consume/FetchHandlerTest.java | 28 +--- .../io/aiven/inkless/consume/ReaderTest.java | 6 +- .../delete/FileCleanerIntegrationTest.java | 22 ++- .../merge/FileMergerIntegrationTest.java | 21 ++- 10 files changed, 232 insertions(+), 202 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e23b0de9be..121c0bea1c 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -18,7 +18,7 @@ package kafka.server import com.yammer.metrics.core.Meter -import io.aiven.inkless.control_plane.FindBatchRequest +import io.aiven.inkless.control_plane.FindBatchResponse import kafka.utils.Logging import java.util.concurrent.TimeUnit @@ -60,6 +60,7 @@ class DelayedFetch( minBytes: Option[Int] = None, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, ) extends DelayedOperation(maxWaitMs.getOrElse(params.maxWaitMs)) with Logging { + var maybeBatchCoordinates: Option[Map[TopicIdPartition, FindBatchResponse]] = None override def toString: String = { s"DelayedFetch(params=$params" + @@ -153,7 +154,15 @@ class DelayedFetch( } } - tryCompleteDiskless(disklessFetchPartitionStatus) match { + // adjust the max bytes for diskless fetches based on the percentage of diskless partitions + // Complete the classic fetches first + val classicRequestsSize = classicFetchPartitionStatus.size.toFloat + val disklessRequestsSize = disklessFetchPartitionStatus.size.toFloat + val totalRequestsSize = classicRequestsSize + disklessRequestsSize + val disklessPercentage = disklessRequestsSize / totalRequestsSize + val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) + + tryCompleteDiskless(disklessFetchPartitionStatus, disklessParams.maxBytes) match { case Some(disklessAccumulatedSize) => accumulatedSize += disklessAccumulatedSize case None => forceComplete() } @@ -174,53 +183,55 @@ class DelayedFetch( * Case D: The fetch offset is equal to the end offset, meaning that we have reached the end of the log * Upon completion, should return whatever data is available for each valid partition */ - private def tryCompleteDiskless(fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[Long] = { + private def tryCompleteDiskless( + fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + disklessMaxBytes: Int + ): Option[Long] = { var accumulatedSize = 0L val fetchPartitionStatusMap = fetchPartitionStatus.toMap - val requests = fetchPartitionStatus.map { case (topicIdPartition, fetchStatus) => - new FindBatchRequest(topicIdPartition, fetchStatus.startOffsetMetadata.messageOffset, fetchStatus.fetchInfo.maxBytes) - } - if (requests.isEmpty) return Some(0) - val response = try { - replicaManager.findDisklessBatches(requests, Int.MaxValue) + maybeBatchCoordinates = try { + Some(replicaManager.findDisklessBatches(fetchPartitionStatus, disklessMaxBytes)) } catch { case e: Throwable => error("Error while trying to find diskless batches on delayed fetch.", e) return None // Case C } - response.get.asScala.foreach { r => - r.errors() match { - case Errors.NONE => - if (r.batches().size() > 0) { - // Gather topic id partition from first batch. Same for all batches in the response. - val topicIdPartition = r.batches().get(0).metadata().topicIdPartition() - val endOffset = r.highWatermark() - - val fetchPartitionStatus = fetchPartitionStatusMap.get(topicIdPartition) - if (fetchPartitionStatus.isEmpty) { - warn(s"Fetch partition status for $topicIdPartition not found in delayed fetch $this.") - return None // Case C - } - - val fetchOffset = fetchPartitionStatus.get.startOffsetMetadata - // If the fetch offset is greater than the end offset, it means that the log has been truncated - // If it is equal to the end offset, it means that we have reached the end of the log - // If the fetch offset is less than the end offset, we can accumulate the size of the batches - if (fetchOffset.messageOffset > endOffset) { - // Truncation happened - debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") - return None // Case A - } else if (fetchOffset.messageOffset < endOffset) { - val bytesAvailable = r.estimatedByteSize(fetchOffset.messageOffset) - accumulatedSize += bytesAvailable // Case B: accumulate the size of the batches - } // Case D: same as fetchOffset == endOffset, no new data available + maybeBatchCoordinates match { + case Some(exists) => + exists.values.foreach { r => + r.errors() match { + case Errors.NONE => + if (r.batches().size() > 0) { + // Gather topic id partition from first batch. Same for all batches in the response. + val topicIdPartition = r.batches().get(0).metadata().topicIdPartition() + val endOffset = r.highWatermark() + + val fetchPartitionStatus = fetchPartitionStatusMap.get(topicIdPartition) + if (fetchPartitionStatus.isEmpty) { + warn(s"Fetch partition status for $topicIdPartition not found in delayed fetch $this.") + return None // Case C + } + + val fetchOffset = fetchPartitionStatus.get.startOffsetMetadata + // If the fetch offset is greater than the end offset, it means that the log has been truncated + // If it is equal to the end offset, it means that we have reached the end of the log + // If the fetch offset is less than the end offset, we can accumulate the size of the batches + if (fetchOffset.messageOffset > endOffset) { + // Truncation happened + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return None // Case A + } else if (fetchOffset.messageOffset < endOffset) { + val bytesAvailable = r.estimatedByteSize(fetchOffset.messageOffset) + accumulatedSize += bytesAvailable // Case B: accumulate the size of the batches + } // Case D: same as fetchOffset == endOffset, no new data available + } + case _ => return None // Case C } - case _ => return None // Case C - } + } + case None => // Case D } - Some(accumulatedSize) } @@ -272,13 +283,16 @@ class DelayedFetch( if (disklessRequestsSize > 0) { // Classic fetches are complete, now handle diskless fetches - // adjust the max bytes for diskless fetches based on the percentage of diskless partitions - val disklessPercentage = disklessRequestsSize / totalRequestsSize - val disklessParams = replicaManager.fetchParamsWithNewMaxBytes(params, disklessPercentage) val disklessFetchInfos = disklessFetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo } - val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(disklessParams, disklessFetchInfos) + val batchCoordinates = maybeBatchCoordinates match { + case Some(batchCoordinates) => batchCoordinates + case None => + responseCallback(Seq.empty) + return + } + val disklessFetchResponseFuture = replicaManager.fetchDisklessMessages(batchCoordinates, disklessFetchInfos) // Combine the classic fetch results with the diskless fetch results disklessFetchResponseFuture.whenComplete { case (disklessFetchPartitionData, _) => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 92a27aabf2..2bf72a3dcc 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1719,16 +1719,40 @@ class ReplicaManager(val config: KafkaConfig, } } - def findDisklessBatches(requests: Seq[FindBatchRequest], maxBytes: Int): Option[util.List[FindBatchResponse]] = { - inklessSharedState.map { sharedState => - sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind()) + def findDisklessBatches(fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], maxBytes: Int): Map[TopicIdPartition, FindBatchResponse] = { + val requests = fetchPartitionStatus.map { case (topicIdPartition, fetchStatus) => + new FindBatchRequest(topicIdPartition, fetchStatus.startOffsetMetadata.messageOffset, fetchStatus.fetchInfo.maxBytes) + } + if (requests.isEmpty) return Map.empty + + val findBatchResponses = try { + inklessSharedState.map { sharedState => + sharedState.controlPlane().findBatches(requests.asJava, maxBytes, sharedState.config().maxBatchesPerPartitionToFind()) + } + } match { + case Some(responses) => responses + case None => + return Map.empty + } catch { + case e: Throwable => + // kala + trace("Error while trying to find diskless batches.", e) + return Map.empty } + + val topicPartitionToFindBatchResponse = collection.mutable.Map[TopicIdPartition, FindBatchResponse]() + for (i <- requests.indices) { + val request = requests(i) + val response = findBatchResponses.get(i) + topicPartitionToFindBatchResponse.update(request.topicIdPartition, response) + } + topicPartitionToFindBatchResponse; } - def fetchDisklessMessages(params: FetchParams, + def fetchDisklessMessages(batchCoordinates: Map[TopicIdPartition, FindBatchResponse], fetchInfos: Seq[(TopicIdPartition, PartitionData)]): CompletableFuture[Seq[(TopicIdPartition, FetchPartitionData)]] = { inklessFetchHandler match { - case Some(handler) => handler.handle(params, fetchInfos.toMap.asJava).thenApply(_.asScala.toSeq) + case Some(handler) => handler.handle(batchCoordinates.asJava, fetchInfos.toMap.asJava).thenApply(_.asScala.toSeq) case None => if (fetchInfos.nonEmpty) error(s"Received diskless fetch request for topics ${fetchInfos.map(_._1.topic()).distinct.mkString(", ")} but diskless fetch handler is not available. " + @@ -1830,6 +1854,8 @@ class ReplicaManager(val config: KafkaConfig, delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, (classicDelayedFetchKeys ++ disklessDelayedFetchKeys).asJava) } + // If there is nothing to fetch for classic topics, + // create delayed response and fetch possible diskless data there. if (classicFetchInfos.isEmpty) { delayedResponse(Seq.empty) return @@ -1894,9 +1920,18 @@ class ReplicaManager(val config: KafkaConfig, // In case of remote fetches, synchronously wait for diskless records and then perform the remote fetch. // This is currently a workaround to avoid modifying the DelayedRemoteFetch in order to correctly process // diskless fetches. + // Get diskless batch coordinates and hand over to fetching + val batchCoordinates = try { + findDisklessBatches(fetchPartitionStatus, Int.MaxValue) + } catch { + case e: Throwable => + error("Error while trying to find diskless batches on remote fetch.", e) + responseCallback(Seq.empty) + return + } + val disklessFetchResults = try { - val disklessParams = fetchParamsWithNewMaxBytes(params, disklessFetchInfos.size.toFloat / fetchInfos.size.toFloat) - val disklessResponsesFuture = fetchDisklessMessages(disklessParams, disklessFetchInfos) + val disklessResponsesFuture = fetchDisklessMessages(batchCoordinates, disklessFetchInfos) val response = disklessResponsesFuture.get(maxWaitMs, TimeUnit.MILLISECONDS) response.map { case (tp, data) => @@ -1933,8 +1968,11 @@ class ReplicaManager(val config: KafkaConfig, } } else { if (disklessFetchInfos.isEmpty && (bytesReadable >= params.minBytes || params.maxWaitMs <= 0)) { + // No remote fetch needed and not any diskless topics to be fetched. + // Response immediately. responseCallback(fetchPartitionData) } else { + // No remote fetch, requires fetching data from the diskless topics. delayedResponse(fetchPartitionStatus) } } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 23c30663d1..354ce3cc3e 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -16,7 +16,7 @@ */ package kafka.server -import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchRequest, FindBatchResponse} +import io.aiven.inkless.control_plane.{BatchInfo, BatchMetadata, FindBatchResponse} import java.util.{Collections, Optional, OptionalLong} import scala.collection.Seq @@ -213,6 +213,9 @@ class DelayedFetchTest { responseCallback = callback ) + val batchCoordinates = Map.empty[TopicIdPartition, FindBatchResponse] + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) + val partition: Partition = mock(classOf[Partition]) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) // Note that the high-watermark does not contain the complete metadata @@ -345,12 +348,13 @@ class DelayedFetchTest { ))) when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset < fetchOffset (truncation) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) // Mock fetchDisklessMessages for onComplete when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], any[Float])).thenAnswer(_.getArgument(0)) - when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + when(replicaManager.fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) .thenReturn(CompletableFuture.completedFuture(Seq((topicIdPartition, mock(classOf[FetchPartitionData]))))) when(replicaManager.readFromLog( @@ -402,6 +406,7 @@ class DelayedFetchTest { fetchResultOpt = Some(responses) } + when(replicaManager.fetchParamsWithNewMaxBytes(any(), any())).thenReturn(fetchParams) val delayedFetch = new DelayedFetch( params = fetchParams, classicFetchPartitionStatus = Seq.empty, @@ -434,8 +439,8 @@ class DelayedFetchTest { when(mockResponse.highWatermark()).thenReturn(fetchOffset) // fetchOffset == endOffset (no new data) when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val future = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any[Seq[(TopicIdPartition, FetchPartitionStatus)]], anyInt())).thenReturn(future) when(replicaManager.readFromLog( fetchParams, @@ -451,7 +456,7 @@ class DelayedFetchTest { assertFalse(fetchResultOpt.isDefined) // Verify that estimatedByteSize is never called since fetchOffset == endOffset - verify(replicaManager, never()).fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]]) + verify(replicaManager, never()).fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]]) verify(mockResponse, never()).estimatedByteSize(anyLong()) } @@ -487,6 +492,7 @@ class DelayedFetchTest { fetchResultOpt = Some(responses) } + when(replicaManager.fetchParamsWithNewMaxBytes(any(), any())).thenReturn(fetchParams) val delayedFetch = new DelayedFetch( params = fetchParams, classicFetchPartitionStatus = Seq.empty, @@ -519,8 +525,8 @@ class DelayedFetchTest { when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset > fetchOffset (data available) when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) when(replicaManager.readFromLog( fetchParams, @@ -601,12 +607,12 @@ class DelayedFetchTest { when(mockResponse.highWatermark()).thenReturn(endOffset) // endOffset > fetchOffset (data available) when(mockResponse.estimatedByteSize(fetchOffset)).thenReturn(estimatedBatchSize) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) // Mock fetchDisklessMessages for onComplete when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) - when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + when(replicaManager.fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) .thenReturn(CompletableFuture.completedFuture(Seq((topicIdPartition, mock(classOf[FetchPartitionData]))))) when(replicaManager.readFromLog( @@ -685,12 +691,12 @@ class DelayedFetchTest { ))) when(mockResponse.highWatermark()).thenReturn(600L) - val future = Some(Collections.singletonList(mockResponse)) - when(replicaManager.findDisklessBatches(any[Seq[FindBatchRequest]], anyInt())).thenReturn(future) + val batchCoordinates = Map((topicIdPartition, mockResponse)) + when(replicaManager.findDisklessBatches(any(), anyInt())).thenReturn(batchCoordinates) // Mock fetchDisklessMessages for onComplete when(replicaManager.fetchParamsWithNewMaxBytes(any[FetchParams], anyFloat())).thenAnswer(_.getArgument(0)) - when(replicaManager.fetchDisklessMessages(any[FetchParams], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) + when(replicaManager.fetchDisklessMessages(any[Map[TopicIdPartition, FindBatchResponse]], any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]])) .thenReturn(CompletableFuture.completedFuture(Seq((topicIdPartition, mock(classOf[FetchPartitionData]))))) when(replicaManager.readFromLog( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3e6dcc78cc..837b4fd9db 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -7098,9 +7098,10 @@ class ReplicaManagerTest { // and the response does not satisfy minBytes, it should be delayed in the purgatory // until the delayed fetch expires. replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback) - assertEquals(0, replicaManager.delayedFetchPurgatory.numDelayed()) + assertEquals(1, replicaManager.delayedFetchPurgatory.numDelayed()) latch.await(10, TimeUnit.SECONDS) // Wait for the delayed fetch to expire + assertEquals(0, replicaManager.delayedFetchPurgatory.numDelayed()) assertNotNull(responseData) assertEquals(2, responseData.size) assertEquals(disklessResponse(disklessTopicPartition), responseData(disklessTopicPartition)) diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java index fb10490dd5..c957a2bf0d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.slf4j.Logger; @@ -37,6 +36,7 @@ import java.util.stream.Collectors; import io.aiven.inkless.common.SharedState; +import io.aiven.inkless.control_plane.FindBatchResponse; public class FetchHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(FetchHandler.class); @@ -50,12 +50,10 @@ public FetchHandler(final SharedState state) { state.objectKeyCreator(), state.keyAlignmentStrategy(), state.cache(), - state.controlPlane(), state.buildStorage(), state.brokerTopicStats(), state.config().fetchMetadataThreadPoolSize(), - state.config().fetchDataThreadPoolSize(), - state.config().maxBatchesPerPartitionToFind() + state.config().fetchDataThreadPoolSize() ) ); } @@ -65,14 +63,14 @@ public FetchHandler(final Reader reader) { } public CompletableFuture> handle( - final FetchParams params, + final Map batchCoordinates, final Map fetchInfos ) { if (fetchInfos.isEmpty()) { return CompletableFuture.completedFuture(Map.of()); } - final CompletableFuture> resultFuture = reader.fetch(params, fetchInfos); + final CompletableFuture> resultFuture = reader.fetch(batchCoordinates, fetchInfos); return resultFuture.handle((result, e) -> { if (result == null) { // We don't really expect this future to fail, but in case it does... diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index 1b04618951..c12545a240 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -44,7 +43,7 @@ import io.aiven.inkless.common.InklessThreadFactory; import io.aiven.inkless.common.ObjectKeyCreator; import io.aiven.inkless.common.metrics.ThreadPoolMonitor; -import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.control_plane.FindBatchResponse; import io.aiven.inkless.storage_backend.common.ObjectFetcher; public class Reader implements AutoCloseable { @@ -54,9 +53,7 @@ public class Reader implements AutoCloseable { private final ObjectKeyCreator objectKeyCreator; private final KeyAlignmentStrategy keyAlignmentStrategy; private final ObjectCache cache; - private final ControlPlane controlPlane; private final ObjectFetcher objectFetcher; - private final int maxBatchesPerPartitionToFind; private final ExecutorService metadataExecutor; private final ExecutorService dataExecutor; private final InklessFetchMetrics fetchMetrics; @@ -65,25 +62,21 @@ public class Reader implements AutoCloseable { private ThreadPoolMonitor dataThreadPoolMonitor; public Reader( - Time time, - ObjectKeyCreator objectKeyCreator, - KeyAlignmentStrategy keyAlignmentStrategy, - ObjectCache cache, - ControlPlane controlPlane, - ObjectFetcher objectFetcher, - BrokerTopicStats brokerTopicStats, - int fetchMetadataThreadPoolSize, - int fetchDataThreadPoolSize, - int maxBatchesPerPartitionToFind + final Time time, + final ObjectKeyCreator objectKeyCreator, + final KeyAlignmentStrategy keyAlignmentStrategy, + final ObjectCache cache, + final ObjectFetcher objectFetcher, + final BrokerTopicStats brokerTopicStats, + final int fetchMetadataThreadPoolSize, + final int fetchDataThreadPoolSize ) { this( time, objectKeyCreator, keyAlignmentStrategy, cache, - controlPlane, objectFetcher, - maxBatchesPerPartitionToFind, Executors.newFixedThreadPool(fetchMetadataThreadPoolSize, new InklessThreadFactory("inkless-fetch-metadata-", false)), Executors.newFixedThreadPool(fetchDataThreadPoolSize, new InklessThreadFactory("inkless-fetch-data-", false)), brokerTopicStats @@ -91,24 +84,20 @@ public Reader( } public Reader( - Time time, - ObjectKeyCreator objectKeyCreator, - KeyAlignmentStrategy keyAlignmentStrategy, - ObjectCache cache, - ControlPlane controlPlane, - ObjectFetcher objectFetcher, - int maxBatchesPerPartitionToFind, - ExecutorService metadataExecutor, - ExecutorService dataExecutor, - BrokerTopicStats brokerTopicStats + final Time time, + final ObjectKeyCreator objectKeyCreator, + final KeyAlignmentStrategy keyAlignmentStrategy, + final ObjectCache cache, + final ObjectFetcher objectFetcher, + final ExecutorService metadataExecutor, + final ExecutorService dataExecutor, + final BrokerTopicStats brokerTopicStats ) { this.time = time; this.objectKeyCreator = objectKeyCreator; this.keyAlignmentStrategy = keyAlignmentStrategy; this.cache = cache; - this.controlPlane = controlPlane; this.objectFetcher = objectFetcher; - this.maxBatchesPerPartitionToFind = maxBatchesPerPartitionToFind; this.metadataExecutor = metadataExecutor; this.dataExecutor = dataExecutor; this.fetchMetrics = new InklessFetchMetrics(time, cache); @@ -123,84 +112,71 @@ public Reader( } public CompletableFuture> fetch( - final FetchParams params, - final Map fetchInfos + final Map batchCoordinates, + final Map fetchInfos ) { final Instant startAt = TimeUtils.durationMeasurementNow(time); fetchMetrics.fetchStarted(fetchInfos.size()); - final var batchCoordinates = CompletableFuture.supplyAsync( - new FindBatchesJob( + return CompletableFuture.supplyAsync( + new FetchPlanner( time, - controlPlane, - params, - fetchInfos, - maxBatchesPerPartitionToFind, - fetchMetrics::findBatchesFinished + objectKeyCreator, + keyAlignmentStrategy, + cache, + objectFetcher, + dataExecutor, + batchCoordinates, + fetchMetrics ), metadataExecutor - ); - return batchCoordinates.thenApply( - coordinates -> - new FetchPlanner( - time, - objectKeyCreator, - keyAlignmentStrategy, - cache, - objectFetcher, - dataExecutor, - coordinates, - fetchMetrics - ).get() - ) - .thenCombineAsync(batchCoordinates, (fileExtents, coordinates) -> - new FetchCompleter( + ).thenApply(fileExtents -> { + return new FetchCompleter( time, objectKeyCreator, fetchInfos, - coordinates, + batchCoordinates, fileExtents, fetchMetrics::fetchCompletionFinished - ).get() - ) - .whenComplete((topicIdPartitionFetchPartitionDataMap, throwable) -> { - // Mark broker side fetch metrics - if (throwable != null) { - LOGGER.warn("Fetch failed", throwable); - for (final var entry : fetchInfos.entrySet()) { - final String topic = entry.getKey().topic(); - brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); - brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); - } - // Check if the exception was caused by a fetch related exception and increment the relevant metric - if (throwable instanceof CompletionException) { - // Finding batches fails on the initial stage - if (throwable.getCause() instanceof FindBatchesException) { - fetchMetrics.findBatchesFailed(); - } else if (throwable.getCause() instanceof FetchException) { - // but storage-related exceptions are wrapped twice as they happen within the fetch completer - final Throwable fetchException = throwable.getCause(); - if (fetchException.getCause() instanceof FileFetchException) { - fetchMetrics.fileFetchFailed(); - } else if (fetchException.getCause() instanceof CacheFetchException) { - fetchMetrics.cacheFetchFailed(); - } + ).get(); + }).whenComplete((topicIdPartitionFetchPartitionDataMap, throwable) -> { + // Mark broker side fetch metrics + if (throwable != null) { + LOGGER.warn("Fetch failed", throwable); + for (final var entry : fetchInfos.entrySet()) { + final String topic = entry.getKey().topic(); + brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); + } + // Check if the exception was caused by a fetch related exception and increment the relevant metric + if (throwable instanceof CompletionException) { + // Finding batches fails on the initial stage + if (throwable.getCause() instanceof FindBatchesException) { + fetchMetrics.findBatchesFailed(); + } else if (throwable.getCause() instanceof FetchException) { + // but storage-related exceptions are wrapped twice as they happen within the fetch completer + final Throwable fetchException = throwable.getCause(); + if (fetchException.getCause() instanceof FileFetchException) { + fetchMetrics.fileFetchFailed(); + } else if (fetchException.getCause() instanceof CacheFetchException) { + fetchMetrics.cacheFetchFailed(); } } - fetchMetrics.fetchFailed(); - } else { - for (final var entry : topicIdPartitionFetchPartitionDataMap.entrySet()) { - final String topic = entry.getKey().topic(); - if (entry.getValue().error == Errors.NONE) { - brokerTopicStats.allTopicsStats().totalFetchRequestRate().mark(); - brokerTopicStats.topicStats(topic).totalFetchRequestRate().mark(); - } else { - brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); - brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); - } + } + fetchMetrics.fetchFailed(); + } else { + for (final var entry : topicIdPartitionFetchPartitionDataMap.entrySet()) { + final String topic = entry.getKey().topic(); + if (entry.getValue().error == Errors.NONE) { + brokerTopicStats.allTopicsStats().totalFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).totalFetchRequestRate().mark(); + } else { + brokerTopicStats.allTopicsStats().failedFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).failedFetchRequestRate().mark(); } - fetchMetrics.fetchCompleted(startAt); } - }); + fetchMetrics.fetchCompleted(startAt); + } + }); } @Override diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java index e42b945474..29c864ce35 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/FetchHandlerTest.java @@ -25,8 +25,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.junit.jupiter.api.Test; @@ -36,6 +34,7 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; @@ -63,16 +62,12 @@ public class FetchHandlerTest { public void readerFutureFailed() throws Exception { when(reader.fetch(any(), any())).thenReturn(CompletableFuture.failedFuture(new RuntimeException())); try (FetchHandler handler = new FetchHandler(reader)) { - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of( topicIdPartition, new FetchRequest.PartitionData(inklessUuid, 0, 0, 1024, Optional.empty()) ); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(1); assertThat(result.get(topicIdPartition)).satisfies(data -> { @@ -102,17 +97,12 @@ public void readerFutureSuccess() throws Exception { ); when(reader.fetch(any(), any())).thenReturn(CompletableFuture.completedFuture(value)); try (FetchHandler handler = new FetchHandler(reader)) { - - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of( topicIdPartition, new FetchRequest.PartitionData(inklessUuid, 0, 0, 1024, Optional.empty()) ); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(1); assertThat(result.get(topicIdPartition)).satisfies(data -> { @@ -141,16 +131,12 @@ public void readerFutureSuccessEmpty() throws Exception { when(reader.fetch(any(), any())).thenReturn(CompletableFuture.completedFuture(value)); try (FetchHandler handler = new FetchHandler(reader)) { - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of( topicIdPartition, new FetchRequest.PartitionData(inklessUuid, 0, 0, 1024, Optional.empty()) ); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(1); assertThat(result.get(topicIdPartition)).satisfies(data -> { @@ -164,13 +150,9 @@ public void readerFutureSuccessEmpty() throws Exception { @Test public void emptyRequest() throws Exception { try (FetchHandler handler = new FetchHandler(reader)) { - final FetchParams params = new FetchParams(fetchVersion, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = Map.of(); - final var result = handler.handle(params, fetchInfos).get(); + final var result = handler.handle(Collections.emptyMap(), fetchInfos).get(); assertThat(result).hasSize(0); } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java index 3f6345ea3c..109bb89ccf 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java @@ -77,8 +77,8 @@ public class ReaderTest { @Test public void testReaderEmptyRequests() throws IOException { - try(final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, controlPlane, objectFetcher, 0, metadataExecutor, dataExecutor, new BrokerTopicStats())) { - final CompletableFuture> fetch = reader.fetch(fetchParams, Collections.emptyMap()); + try(final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, objectFetcher, metadataExecutor, dataExecutor, new BrokerTopicStats())) { + final CompletableFuture> fetch = reader.fetch(Collections.emptyMap(), Collections.emptyMap()); verify(metadataExecutor, atLeastOnce()).execute(any()); verifyNoInteractions(dataExecutor); assertThat(fetch.join()).isEqualTo(Collections.emptyMap()); @@ -87,7 +87,7 @@ public void testReaderEmptyRequests() throws IOException { @Test public void testClose() throws Exception { - final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, controlPlane, objectFetcher, 0, metadataExecutor, dataExecutor, new BrokerTopicStats()); + final var reader = new Reader(time, OBJECT_KEY_CREATOR, KEY_ALIGNMENT_STRATEGY, OBJECT_CACHE, objectFetcher, metadataExecutor, dataExecutor, new BrokerTopicStats()); reader.close(); verify(metadataExecutor, atLeastOnce()).shutdown(); verify(dataExecutor, atLeastOnce()).shutdown(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java index 31f583593a..6ed8a1d00b 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java @@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.RequestLocal; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -297,15 +295,25 @@ private Map> read(final FetchHandler fetchHa private void readIteration(final FetchHandler fetchHandler, final ConcurrentHashMap fetchPositions, final ConcurrentMap> records) throws InterruptedException, ExecutionException, TimeoutException { - final FetchParams params = new FetchParams(FETCH_VERSION, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = ALL_TOPIC_ID_PARTITIONS.stream().collect(Collectors.toMap( tidp -> tidp, tidp -> new FetchRequest.PartitionData(TOPIC_ID_0, fetchPositions.get(tidp), 0, 1024 * 1024, Optional.empty()) )); - final Map fetchResult = fetchHandler.handle(params, fetchInfos).get(2L, TimeUnit.SECONDS); + + final List requests = ALL_TOPIC_ID_PARTITIONS.stream().map(tidp -> { + return new FindBatchRequest(tidp, fetchPositions.get(tidp), 1024 * 1024); + }).toList(); + + final List findBatchResponses = sharedState.controlPlane().findBatches(requests, 1024 * 1024, sharedState.config().maxBatchesPerPartitionToFind()); + + final Map batchCoordinates = new HashMap<>(); + for (int i = 0; i < requests.size(); i++) { + final FindBatchRequest request = requests.get(i); + final FindBatchResponse response = findBatchResponses.get(i); + batchCoordinates.put(request.topicIdPartition(), response); + } + + final Map fetchResult = fetchHandler.handle(batchCoordinates, fetchInfos).get(2L, TimeUnit.SECONDS); for (final var entry : fetchResult.entrySet()) { final var tidp = entry.getKey(); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java index bafd7b95ec..eea6d60e3f 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java @@ -27,8 +27,6 @@ import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.RequestLocal; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -316,16 +314,25 @@ private Map> read(final FetchHandler fetchHa private void readIteration(final FetchHandler fetchHandler, final ConcurrentHashMap fetchPositions, final ConcurrentMap> records) throws InterruptedException, ExecutionException, TimeoutException { - final FetchParams params = new FetchParams(FETCH_VERSION, - -1, -1, -1, -1, - FetchIsolation.LOG_END, Optional.empty()); - final Map fetchInfos = ALL_TOPIC_ID_PARTITIONS.stream().collect(Collectors.toMap( tidp -> tidp, tidp -> new FetchRequest.PartitionData(TOPIC_ID_0, fetchPositions.get(tidp), 0, 1024 * 1024, Optional.empty()) )); - final Map fetchResult = fetchHandler.handle(params, fetchInfos).get(2L, TimeUnit.SECONDS); + final List requests = ALL_TOPIC_ID_PARTITIONS.stream().map(tidp -> { + return new FindBatchRequest(tidp, fetchPositions.get(tidp), 1024 * 1024); + }).toList(); + + final List findBatchResponses = sharedState.controlPlane().findBatches(requests, 1024 * 1024, sharedState.config().maxBatchesPerPartitionToFind()); + + final Map batchCoordinates = new HashMap<>(); + for (int i = 0; i < requests.size(); i++) { + final FindBatchRequest request = requests.get(i); + final FindBatchResponse response = findBatchResponses.get(i); + batchCoordinates.put(request.topicIdPartition(), response); + } + + final Map fetchResult = fetchHandler.handle(batchCoordinates, fetchInfos).get(2L, TimeUnit.SECONDS); for (final var entry : fetchResult.entrySet()) { final var tidp = entry.getKey(); boolean isEmpty = true; From d145d6961c1d8ce91474de99f66f539656cef16b Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Mon, 10 Nov 2025 10:56:38 +0200 Subject: [PATCH 2/2] storage: remove unused FindBatchesJob and related classes --- .../inkless/consume/FindBatchesException.java | 24 ----- .../aiven/inkless/consume/FindBatchesJob.java | 86 ----------------- .../java/io/aiven/inkless/consume/Reader.java | 5 +- .../inkless/consume/FindBatchesJobTest.java | 92 ------------------- 4 files changed, 1 insertion(+), 206 deletions(-) delete mode 100644 storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java delete mode 100644 storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java delete mode 100644 storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java deleted file mode 100644 index 3a74c338af..0000000000 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesException.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Inkless - * Copyright (C) 2024 - 2025 Aiven OY - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.aiven.inkless.consume; - -public class FindBatchesException extends RuntimeException { - public FindBatchesException(Throwable cause) { - super(cause); - } -} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java deleted file mode 100644 index 5791d0f6a4..0000000000 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/FindBatchesJob.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Inkless - * Copyright (C) 2024 - 2025 Aiven OY - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.aiven.inkless.consume; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.storage.log.FetchParams; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Supplier; - -import io.aiven.inkless.TimeUtils; -import io.aiven.inkless.control_plane.ControlPlane; -import io.aiven.inkless.control_plane.FindBatchRequest; -import io.aiven.inkless.control_plane.FindBatchResponse; - -public class FindBatchesJob implements Supplier> { - - private final Time time; - private final ControlPlane controlPlane; - private final FetchParams params; - private final Map fetchInfos; - private final int maxBatchesPerPartition; - private final Consumer durationCallback; - - public FindBatchesJob(Time time, - ControlPlane controlPlane, - FetchParams params, - Map fetchInfos, - int maxBatchesPerPartition, - Consumer durationCallback) { - this.time = time; - this.controlPlane = controlPlane; - this.params = params; - this.fetchInfos = fetchInfos; - this.maxBatchesPerPartition = maxBatchesPerPartition; - this.durationCallback = durationCallback; - } - - @Override - public Map get() { - return TimeUtils.measureDurationMsSupplier(time, this::doWork, durationCallback); - } - - private Map doWork() { - try { - List requests = new ArrayList<>(); - for (Map.Entry fetchInfo : fetchInfos.entrySet()) { - TopicIdPartition topicIdPartition = fetchInfo.getKey(); - requests.add(new FindBatchRequest(topicIdPartition, fetchInfo.getValue().fetchOffset, fetchInfo.getValue().maxBytes)); - } - - List responses = controlPlane.findBatches(requests, params.maxBytes, maxBatchesPerPartition); - - Map out = new HashMap<>(); - for (int i = 0; i < requests.size(); i++) { - FindBatchRequest request = requests.get(i); - FindBatchResponse response = responses.get(i); - out.put(request.topicIdPartition(), response); - } - return out; - } catch (Exception e) { - throw new FindBatchesException(e); - } - } -} diff --git a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java index c12545a240..2ae23ca58d 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java @@ -149,10 +149,7 @@ public CompletableFuture> fetch( } // Check if the exception was caused by a fetch related exception and increment the relevant metric if (throwable instanceof CompletionException) { - // Finding batches fails on the initial stage - if (throwable.getCause() instanceof FindBatchesException) { - fetchMetrics.findBatchesFailed(); - } else if (throwable.getCause() instanceof FetchException) { + if (throwable.getCause() instanceof FetchException) { // but storage-related exceptions are wrapped twice as they happen within the fetch completer final Throwable fetchException = throwable.getCause(); if (fetchException.getCause() instanceof FileFetchException) { diff --git a/storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java b/storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java deleted file mode 100644 index 6707c0d104..0000000000 --- a/storage/inkless/src/test/java/io/aiven/inkless/consume/FindBatchesJobTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Inkless - * Copyright (C) 2024 - 2025 Aiven OY - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.aiven.inkless.consume; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.storage.log.FetchParams; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import io.aiven.inkless.control_plane.BatchInfo; -import io.aiven.inkless.control_plane.BatchMetadata; -import io.aiven.inkless.control_plane.ControlPlane; -import io.aiven.inkless.control_plane.FindBatchRequest; -import io.aiven.inkless.control_plane.FindBatchResponse; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) -public class FindBatchesJobTest { - - private final Time time = new MockTime(); - private final int maxBatchesPerPartition = 0; - - @Mock - private ControlPlane controlPlane; - @Mock - private FetchParams params; - - @Captor - ArgumentCaptor> requestCaptor; - - Uuid topicId = Uuid.randomUuid(); - static final String OBJECT_KEY_MAIN_PART = "a"; - TopicIdPartition partition0 = new TopicIdPartition(topicId, 0, "diskless-topic"); - - @Test - public void findSingleBatch() { - Map fetchInfos = Map.of( - partition0, new FetchRequest.PartitionData(topicId, 0, 0, 1000, Optional.empty()) - ); - int logStartOffset = 0; - long logAppendTimestamp = 10L; - long maxBatchTimestamp = 20L; - int highWatermark = 1; - Map coordinates = Map.of( - partition0, FindBatchResponse.success(List.of( - new BatchInfo(1L, OBJECT_KEY_MAIN_PART, BatchMetadata.of(partition0, 0, 10, 0, 0, logAppendTimestamp, maxBatchTimestamp, TimestampType.CREATE_TIME)) - ), logStartOffset, highWatermark) - ); - FindBatchesJob job = new FindBatchesJob(time, controlPlane, params, fetchInfos, maxBatchesPerPartition, durationMs -> {}); - when(controlPlane.findBatches(requestCaptor.capture(), anyInt(), anyInt())).thenReturn(new ArrayList<>(coordinates.values())); - Map result = job.get(); - - assertThat(result).isEqualTo(coordinates); - } - -}