Skip to content

Commit 6a50b41

Browse files
authored
Merge pull request #416 from fsprojects/repo-assist/feat-channel-integration-20260430-c520bd85af285b66
[Repo Assist] feat: add TaskSeq.toChannelAsync and TaskSeq.ofChannel for System.Threading.Channels integration
2 parents 2f3c114 + c823516 commit 6a50b41

5 files changed

Lines changed: 128 additions & 0 deletions

File tree

release-notes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Release notes:
33

44
Unreleased
5+
- feat: add `TaskSeq.toChannelAsync` and `TaskSeq.ofChannel` for bidirectional `System.Threading.Channels` integration, closing #415
56
- eng: update PackageValidationBaselineVersion from 0.4.0 to 1.1.1 to enforce binary compatibility checks against the current stable release
67
- test: add SideEffects module and ImmTaskSeq variant tests to TaskSeq.ChunkBy.Tests.fs, improving coverage for chunkBy and chunkByAsync
78
- fixes: `Async.bind` signature corrected from `(Async<'T> -> Async<'U>)` to `('T -> Async<'U>)` to match standard monadic bind semantics (same as `Task.bind`); the previous signature made the function effectively equivalent to direct application

src/FSharp.Control.TaskSeq.Test/TaskSeq.ToXXX.Tests.fs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module TaskSeq.Tests.``Conversion-To``
22

33
open System.Collections.Generic
4+
open System.Threading.Channels
45

56
open Xunit
67
open FsUnit.Xunit
@@ -186,3 +187,72 @@ module SideEffects =
186187
let (results2: seq<_>) = tq |> TaskSeq.toSeq
187188
results1 |> Seq.toArray |> should equal [| 1..10 |]
188189
results2 |> Seq.toArray |> should equal [| 11..20 |]
190+
191+
module Channel =
192+
193+
[<Fact>]
194+
let ``TaskSeq-toChannelAsync with null writer raises`` () =
195+
assertNullArg
196+
<| fun () ->
197+
TaskSeq.toChannelAsync null (TaskSeq.ofArray [| 1 |])
198+
|> ignore
199+
200+
[<Fact>]
201+
let ``TaskSeq-toChannelAsync with null source raises`` () =
202+
let ch = Channel.CreateUnbounded<int>()
203+
204+
assertNullArg
205+
<| fun () -> TaskSeq.toChannelAsync ch.Writer null |> ignore
206+
207+
[<Fact>]
208+
let ``TaskSeq-ofChannel with null reader raises`` () =
209+
assertNullArg
210+
<| fun () -> TaskSeq.ofChannel<int> null |> ignore
211+
212+
[<Fact>]
213+
let ``TaskSeq-toChannelAsync with empty source completes the channel`` () = task {
214+
let ch = Channel.CreateUnbounded<int>()
215+
do! TaskSeq.toChannelAsync ch.Writer TaskSeq.empty
216+
ch.Reader.Completion.IsCompleted |> should be True
217+
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
218+
results |> should be Empty
219+
}
220+
221+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
222+
let ``TaskSeq-toChannelAsync writes all elements and completes the channel`` variant = task {
223+
let tq = Gen.getSeqImmutable variant
224+
let ch = Channel.CreateUnbounded<int>()
225+
do! TaskSeq.toChannelAsync ch.Writer tq
226+
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
227+
results |> should equal [| 1..10 |]
228+
// Completion resolves once the channel is marked done and the buffer is drained
229+
do! ch.Reader.Completion
230+
}
231+
232+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
233+
let ``TaskSeq-ofChannel yields all elements written to the channel`` variant = task {
234+
let tq = Gen.getSeqImmutable variant
235+
let ch = Channel.CreateUnbounded<int>()
236+
do! TaskSeq.toChannelAsync ch.Writer tq
237+
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
238+
results |> should equal [| 1..10 |]
239+
}
240+
241+
[<Fact>]
242+
let ``TaskSeq-ofChannel ends when channel is completed and drained`` () = task {
243+
let ch = Channel.CreateUnbounded<int>()
244+
do! ch.Writer.WriteAsync 42
245+
do! ch.Writer.WriteAsync 99
246+
ch.Writer.Complete()
247+
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
248+
results |> should equal [| 42; 99 |]
249+
}
250+
251+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
252+
let ``TaskSeq-toChannelAsync executes side effects`` variant = task {
253+
let tq = Gen.getSeqWithSideEffect variant
254+
let ch = Channel.CreateUnbounded<int>()
255+
do! TaskSeq.toChannelAsync ch.Writer tq
256+
let! results = TaskSeq.ofChannel ch.Reader |> TaskSeq.toArrayAsync
257+
results |> should equal [| 1..10 |]
258+
}

src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,7 @@ Generates optimized IL code through resumable state machines, and comes with a c
7474
<!-- if using "remove unused references", this prevents FSharp.Core from being shown in that list -->
7575
<TreatAsUsed>true</TreatAsUsed>
7676
</PackageReference>
77+
<!-- Provides System.Threading.Channels for netstandard2.1 consumers; built into net5.0+ -->
78+
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
7779
</ItemGroup>
7880
</Project>

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ namespace FSharp.Control
22

33
open System.Collections.Generic
44
open System.Threading
5+
open System.Threading.Channels
56
open System.Threading.Tasks
67

78
// Just for convenience
@@ -180,6 +181,23 @@ type TaskSeq private () =
180181

181182
static member toIListAsync source = Internal.toResizeArrayAndMapAsync (fun x -> x :> IList<_>) source
182183

184+
static member toChannelAsync (writer: ChannelWriter<'T>) (source: TaskSeq<'T>) : Task =
185+
Internal.checkNonNull (nameof writer) writer
186+
Internal.checkNonNull (nameof source) source
187+
188+
task {
189+
try
190+
use e = source.GetAsyncEnumerator CancellationToken.None
191+
192+
while! e.MoveNextAsync() do
193+
do! writer.WriteAsync e.Current
194+
195+
writer.TryComplete() |> ignore
196+
with exn ->
197+
writer.TryComplete exn |> ignore
198+
}
199+
:> Task
200+
183201
//
184202
// Convert 'OfXXX' functions
185203
//
@@ -261,6 +279,17 @@ type TaskSeq private () =
261279
yield c
262280
}
263281

282+
static member ofChannel(reader: ChannelReader<'T>) : TaskSeq<'T> =
283+
Internal.checkNonNull (nameof reader) reader
284+
285+
taskSeq {
286+
while! reader.WaitToReadAsync() do
287+
let mutable item = Unchecked.defaultof<_>
288+
289+
while reader.TryRead &item do
290+
yield item
291+
}
292+
264293
static member withCancellation (cancellationToken: CancellationToken) (source: TaskSeq<'T>) =
265294
Internal.checkNonNull (nameof source) source
266295

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ namespace FSharp.Control
22

33
open System.Collections.Generic
44
open System.Threading
5+
open System.Threading.Channels
56
open System.Threading.Tasks
67

78
[<AutoOpen>]
@@ -529,6 +530,20 @@ type TaskSeq =
529530
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
530531
static member toIListAsync: source: TaskSeq<'T> -> Task<IList<'T>>
531532

533+
/// <summary>
534+
/// Writes all elements of the input task sequence <paramref name="source" /> to a
535+
/// <see cref="ChannelWriter&lt;'T>" /> and marks the writer as complete when the sequence
536+
/// is exhausted. If an exception is raised during iteration, the writer is completed with
537+
/// that exception so that downstream readers observe it.
538+
/// This function is non-blocking while it writes to the channel.
539+
/// </summary>
540+
///
541+
/// <param name="writer">The channel writer to write elements into.</param>
542+
/// <param name="source">The input task sequence.</param>
543+
/// <returns>A <see cref="Task" /> that completes when all elements have been written.</returns>
544+
/// <exception cref="T:ArgumentNullException">Thrown when <paramref name="writer" /> or <paramref name="source" /> is null.</exception>
545+
static member toChannelAsync: writer: ChannelWriter<'T> -> source: TaskSeq<'T> -> Task
546+
532547
/// <summary>
533548
/// Views the given <see cref="array" /> as a task sequence, that is, as an <see cref="IAsyncEnumerable&lt;'T>" />.
534549
/// </summary>
@@ -642,6 +657,17 @@ type TaskSeq =
642657
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
643658
static member ofAsyncArray: source: Async<'T> array -> TaskSeq<'T>
644659

660+
/// <summary>
661+
/// Views a <see cref="ChannelReader&lt;'T>" /> as a task sequence. Elements are yielded as they
662+
/// become available; the sequence ends when the channel is completed and all buffered elements
663+
/// have been consumed.
664+
/// </summary>
665+
///
666+
/// <param name="reader">The channel reader to read elements from.</param>
667+
/// <returns>A task sequence that yields elements from the channel.</returns>
668+
/// <exception cref="T:ArgumentNullException">Thrown when <paramref name="reader" /> is null.</exception>
669+
static member ofChannel: reader: ChannelReader<'T> -> TaskSeq<'T>
670+
645671
/// <summary>
646672
/// Returns a task sequence that, when iterated, passes the given <paramref name="cancellationToken" /> to the
647673
/// underlying <see cref="IAsyncEnumerable&lt;'T&gt;" />. This is the equivalent of calling

0 commit comments

Comments
 (0)