Skip to content

hold1 + concurrently releases bracket immediately #3567

@morgen-peschke

Description

@morgen-peschke

Using hold1 appears to break concurrently and/or bracket.

Here's a minimized version of some code we were using to keep a token fresh:

import fs2.Stream
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration.DurationDouble
import cats.effect.unsafe.implicits.global

(Ref[IO].of(none[Boolean]), Ref[IO].of(0))
  .flatMapN { (flag, count) =>
    val acquire = println("acquire") *> flag.set(true.some)
    val release: Unit => IO[Unit] = _ => println("release") *> flag.set(none)
    val increment = count.updateAndGet(_ + 1).flatTap(c => println(s"increment: $c"))
    
    Stream
      .bracket(acquire)(release)
      .flatMap { _ =>
        Stream.emit('e').concurrently {
          Stream.repeatEval(increment).metered(0.1.seconds)
        }
      }
      .evalTap(v => flag.get.flatMap(f => println(s"emitted: $v $f")))
      .hold1
      .evalTap { valueIO =>
        (valueIO.get, flag.get)
          .flatMapN((v, f) => println(s"check: $v $f"))
          .delayBy(.3.seconds)
          .replicateA(2) *> println("Done")
      }
      .compile
      .drain
  }
  .unsafeRunTimed(5.seconds)

Output:

acquire
emitted: e Some(true)
release
check: e None
check: e None
Done

Notably, increment never runs, release is called immediately, and stream is only consumed after.

The behavior I expected would have been:

acquire
emitted: e Some(true)
increment: 1
increment: 2
increment: 3
check: e Some(true)
increment: 4
increment: 5
increment: 6
check: e Some(true)
Done
release

I don't know enough about the internals to figure out why this is happening, though it seems like it may be related to #3123, and possibly #2936.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions