File tree Expand file tree Collapse file tree
main/scala/fs2/concurrent
test/scala/fs2/concurrent Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -164,9 +164,22 @@ object Topic {
164164 signalClosure.tryGet.flatMap {
165165 case Some (_) => Topic .closed.pure[F ]
166166 case None =>
167- state.get
168- .flatMap { case (subs, _) => foreach(subs)(_.send(a).void) }
169- .as(Topic .rightUnit)
167+ state.get.flatMap { case (subs, _) =>
168+ subs.foldLeft(F .pure(Topic .rightUnit)) { case (acc, (_, chan)) =>
169+ acc.flatMap {
170+ case Left (Topic .Closed ) => Topic .closed.pure[F ]
171+ case Right (_) =>
172+ chan.send(a).flatMap {
173+ case Right (_) => Topic .rightUnit.pure[F ]
174+ case Left (_) =>
175+ signalClosure.tryGet.map {
176+ case Some (_) => Topic .closed
177+ case None => Topic .rightUnit
178+ }
179+ }
180+ }
181+ }
182+ }
170183 }
171184
172185 def subscribeAwait (maxQueued : Int ): Resource [F , Stream [F , A ]] =
Original file line number Diff line number Diff line change @@ -188,7 +188,7 @@ class TopicSuite extends Fs2Suite {
188188
189189 // https://github.com/typelevel/fs2/issues/3644
190190 test(
191- " when publish1 returns success, subscribers must receive the event, even if the publish1 races with close" .fail
191+ " when publish1 returns success, subscribers must receive the event, even if the publish1 races with close"
192192 ) {
193193 val check : IO [Unit ] =
194194 Topic [IO , String ]
You can’t perform that action at this time.
0 commit comments