-
-
Notifications
You must be signed in to change notification settings - Fork 622
Open
Labels
Description
Using hold1 appears to break concurrently and/or bracket.
Here's a minimized version of some code we were using to keep a token fresh:
import fs2.Stream
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration.DurationDouble
import cats.effect.unsafe.implicits.global
(Ref[IO].of(none[Boolean]), Ref[IO].of(0))
.flatMapN { (flag, count) =>
val acquire = println("acquire") *> flag.set(true.some)
val release: Unit => IO[Unit] = _ => println("release") *> flag.set(none)
val increment = count.updateAndGet(_ + 1).flatTap(c => println(s"increment: $c"))
Stream
.bracket(acquire)(release)
.flatMap { _ =>
Stream.emit('e').concurrently {
Stream.repeatEval(increment).metered(0.1.seconds)
}
}
.evalTap(v => flag.get.flatMap(f => println(s"emitted: $v $f")))
.hold1
.evalTap { valueIO =>
(valueIO.get, flag.get)
.flatMapN((v, f) => println(s"check: $v $f"))
.delayBy(.3.seconds)
.replicateA(2) *> println("Done")
}
.compile
.drain
}
.unsafeRunTimed(5.seconds)Output:
acquire
emitted: e Some(true)
release
check: e None
check: e None
Done
Notably, increment never runs, release is called immediately, and stream is only consumed after.
The behavior I expected would have been:
acquire
emitted: e Some(true)
increment: 1
increment: 2
increment: 3
check: e Some(true)
increment: 4
increment: 5
increment: 6
check: e Some(true)
Done
release
I don't know enough about the internals to figure out why this is happening, though it seems like it may be related to #3123, and possibly #2936.