Skip to content

Unexpected behavior of onFinalize behavior #3607

@wookievx

Description

@wookievx

This issue revolves around onFinalize (which would probably translate to the bracket operation) not behaving like expected. During work on fs2-kafka library with the help of colleagues we discovered that partitionMapStream calls onFinalize too early causing incorrect rebalance behavior (I am raising the issue here because we pin-pointed it directly to the onFinalize invocation). The reproduction can be found on this branch in a test (To run it just call: sbt testOnly fs2.kafka.KafkaConsumerSpec -- -z "Graceful", it might not happen every time, but it is pretty consistent.): wookievx/fs2-kafka#2.

Here is a fragment of logs, distilled to contain relevant points that showcases that it indeed violates the behavior I would expect from it: that upon stream interruption all already executing evalMaps on the stream are grecefuly cancelled (so uncancallable blocks are allowed to finish) before calling onFinalize callback.

Consumer1 started processing key-18
2025-09-09 12:57:44,695 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Request joining group due to: group is already rebalancing
2025-09-09 12:57:44,695 | INFO | o.a.k.c.c.i.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Revoke previously assigned partitions topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3
Finished stream for topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0 with id e8c0050b-8a6b-4e61-9f86-f7d70e8054a3
Finished stream for topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1 with id c50d9d83-3ef3-46de-a643-778b483cf8e5
Finished stream for topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2 with id 2d509097-9a34-4f08-ad25-3eb523e9adac
Finished stream for topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3 with id 18165c64-476f-4fd7-bd8a-f7dc38e4d1e2
Executed call-back for partitions: TreeSet(topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3)
2025-09-09 12:57:44,704 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] (Re-)joining group
2025-09-09 12:57:44,708 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-rebalance-test-group-1-e57fcf56-7871-42c5-9d32-3af14e18bfe7', protocol='range'}
2025-09-09 12:57:44,709 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-rebalance-test-group-2-8ab93b89-a2d7-4359-8294-e1cd6fd404dc', protocol='range'}
2025-09-09 12:57:44,709 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Finished assignment for group at generation 2: {consumer-rebalance-test-group-1-e57fcf56-7871-42c5-9d32-3af14e18bfe7=Assignment(partitions=[topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1]), consumer-rebalance-test-group-2-8ab93b89-a2d7-4359-8294-e1cd6fd404dc=Assignment(partitions=[topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3])}
2025-09-09 12:57:44,712 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-rebalance-test-group-1-e57fcf56-7871-42c5-9d32-3af14e18bfe7', protocol='range'}
2025-09-09 12:57:44,712 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Notifying assignor about the new Assignment(partitions=[topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1])
2025-09-09 12:57:44,712 | INFO | o.a.k.c.c.i.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Adding newly assigned partitions: topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1
Got signals: GracefulSignals(topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0,2f07c754-e35a-43c0-8317-8d5940bd4ab0,cats.effect.IODeferred@33b2b37b)
Got signals: GracefulSignals(topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1,e2a1d774-36aa-43c9-8af6-880f1c4e8c4c,cats.effect.IODeferred@11991a70)
2025-09-09 12:57:44,713 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-rebalance-test-group-2-8ab93b89-a2d7-4359-8294-e1cd6fd404dc', protocol='range'}
2025-09-09 12:57:44,713 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Notifying assignor about the new Assignment(partitions=[topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3])
2025-09-09 12:57:44,713 | INFO | o.a.k.c.c.i.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Adding newly assigned partitions: topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3
Got signals: GracefulSignals(topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2,0b61a423-ebb6-43e5-a6cf-2c99d71c7c5f,cats.effect.IODeferred@19505d8e)
Got signals: GracefulSignals(topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3,c64b22f1-0c59-48ff-8d5e-1ec3d6772d29,cats.effect.IODeferred@52b49e34)
2025-09-09 12:57:44,715 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Found no committed offset for partition topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0
2025-09-09 12:57:44,715 | INFO | o.a.k.c.c.internals.ConsumerUtils - Setting offset for partition topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1 to the committed offset FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58661 (id: 1 rack: null)], epoch=0}}
2025-09-09 12:57:44,716 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Found no committed offset for partition topic-f508a0db-13b1-4e09-86fd-242539ee67a2-2
2025-09-09 12:57:44,716 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-2, groupId=rebalance-test-group] Found no committed offset for partition topic-f508a0db-13b1-4e09-86fd-242539ee67a2-3
2025-09-09 12:57:44,719 | INFO | o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Resetting offset for partition topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:58661 (id: 1 rack: null)], epoch=0}}.
Consumer2 started processing key-18
[!!!]Consumer2 finished processing key-18
2025-09-09 12:57:44,736 | INFO | o.a.k.c.c.i.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Revoke previously assigned partitions topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1
Finished stream for topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0 with id 2f07c754-e35a-43c0-8317-8d5940bd4ab0
Finished stream for topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1 with id e2a1d774-36aa-43c9-8af6-880f1c4e8c4c
Executed call-back for partitions: TreeSet(topic-f508a0db-13b1-4e09-86fd-242539ee67a2-0, topic-f508a0db-13b1-4e09-86fd-242539ee67a2-1)
2025-09-09 12:57:44,737 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Member consumer-rebalance-test-group-1-e57fcf56-7871-42c5-9d32-3af14e18bfe7 sending LeaveGroup request to coordinator localhost:58661 (id: 2147483646 rack: null) due to the consumer is being closed
2025-09-09 12:57:44,737 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Resetting generation and member id due to: consumer pro-actively leaving the group
2025-09-09 12:57:44,737 | INFO | o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-rebalance-test-group-1, groupId=rebalance-test-group] Request joining group due to: consumer pro-actively leaving the group
Consumer1 finished processing key-18

What you can see is that even though uncancellable block in evalMap operation on already pulled element is not yet finished its processing the onFinalize is called and processing continues. We have some idea about possible work-arounds in fs2-kafka for this: instead of interrupting stream send a poison letter via queue. But this is a big behavior change, we would like to avoid.

If on the other hand this is expected behavior of onFinalize/bracket, it is kind of unintuitive and a bit of impractical in real world scenario (why would I want to have guarantees for cleanup on last element pulled, instead of it being actually processed)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions