Skip to content

Commit 90ec67c

Browse files
Copilotthomhurst
andcommitted
Add comprehensive disposal documentation and examples
Co-authored-by: thomhurst <30480171+thomhurst@users.noreply.github.com>
1 parent cce917f commit 90ec67c

7 files changed

Lines changed: 452 additions & 27 deletions

File tree

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#if NET6_0_OR_GREATER
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Runtime.CompilerServices;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using EnumerableAsyncProcessor.Extensions;
9+
10+
namespace EnumerableAsyncProcessor.Example;
11+
12+
/// <summary>
13+
/// Examples demonstrating proper disposal patterns for EnumerableAsyncProcessor objects.
14+
/// This addresses the common question: "How/when to correctly dispose the resulting processor objects?"
15+
/// </summary>
16+
public static class DisposalExample
17+
{
18+
public static async Task RunExamples()
19+
{
20+
Console.WriteLine("Disposal Pattern Examples");
21+
Console.WriteLine("========================\n");
22+
23+
// Example 1: The problematic pattern from the issue
24+
Console.WriteLine("Example 1: PROBLEMATIC - No disposal (resource leak!)");
25+
var results1 = await ProblematicPatternAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
26+
Console.WriteLine($"Results: {string.Join(", ", results1.ToList())}");
27+
Console.WriteLine("⚠️ This pattern leaks resources because the processor is never disposed!\n");
28+
29+
// Example 2: Proper disposal with await using
30+
Console.WriteLine("Example 2: PROPER - Using await using for automatic disposal");
31+
var results2 = await ProperPatternWithAwaitUsingAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
32+
Console.WriteLine($"Results: {string.Join(", ", results2.ToList())}");
33+
Console.WriteLine("✅ Resources automatically cleaned up with await using\n");
34+
35+
// Example 3: Proper disposal with manual try-finally
36+
Console.WriteLine("Example 3: PROPER - Manual disposal with try-finally");
37+
var results3 = await ProperPatternWithManualDisposalAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
38+
Console.WriteLine($"Results: {string.Join(", ", results3.ToList())}");
39+
Console.WriteLine("✅ Resources manually cleaned up in finally block\n");
40+
41+
// Example 4: Using the convenience extension (no disposal needed)
42+
Console.WriteLine("Example 4: CONVENIENT - Using extension methods (disposal handled internally)");
43+
var asyncEnumerable = GenerateAsyncEnumerable(5);
44+
var results4 = await asyncEnumerable.ProcessInParallel(async item =>
45+
{
46+
await Task.Delay(50);
47+
return item * 2;
48+
});
49+
Console.WriteLine($"Results: {string.Join(", ", results4)}");
50+
Console.WriteLine("✅ Extension methods handle disposal internally\n");
51+
52+
// Example 5: Streaming results with proper disposal
53+
Console.WriteLine("Example 5: STREAMING - Processing results as they arrive with proper disposal");
54+
await StreamingWithProperDisposalAsync(new[] { 1, 2, 3, 4, 5 }, CancellationToken.None);
55+
Console.WriteLine("✅ Streamed results with proper disposal\n");
56+
}
57+
58+
/// <summary>
59+
/// This is the PROBLEMATIC pattern from the GitHub issue - it leaks resources!
60+
/// DO NOT USE THIS PATTERN in production code.
61+
/// </summary>
62+
private static async Task<IAsyncEnumerable<int>> ProblematicPatternAsync(int[] input, CancellationToken token)
63+
{
64+
// ⚠️ PROBLEM: The processor is created but never disposed!
65+
var batchProcessor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
66+
67+
// This returns the async enumerable, but the processor that created it is never disposed
68+
return batchProcessor.GetResultsAsyncEnumerable();
69+
70+
// 🔥 RESOURCE LEAK: The processor goes out of scope without being disposed,
71+
// potentially leaving tasks running and resources uncleaned
72+
}
73+
74+
/// <summary>
75+
/// PROPER pattern using await using for automatic disposal.
76+
/// This is the recommended approach.
77+
/// </summary>
78+
private static async Task<IAsyncEnumerable<int>> ProperPatternWithAwaitUsingAsync(int[] input, CancellationToken token)
79+
{
80+
// ✅ Create processor with await using for automatic disposal
81+
await using var processor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
82+
83+
// Collect results into a list to return
84+
var results = new List<int>();
85+
await foreach (var result in processor.GetResultsAsyncEnumerable())
86+
{
87+
results.Add(result);
88+
}
89+
90+
// Return as async enumerable
91+
return results.ToAsyncEnumerable();
92+
93+
// ✅ Processor is automatically disposed here due to 'await using'
94+
}
95+
96+
/// <summary>
97+
/// PROPER pattern using manual disposal with try-finally.
98+
/// Use this when you need more control over the disposal timing.
99+
/// </summary>
100+
private static async Task<IAsyncEnumerable<int>> ProperPatternWithManualDisposalAsync(int[] input, CancellationToken token)
101+
{
102+
var processor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
103+
104+
try
105+
{
106+
// Collect results into a list to return
107+
var results = new List<int>();
108+
await foreach (var result in processor.GetResultsAsyncEnumerable())
109+
{
110+
results.Add(result);
111+
}
112+
113+
return results.ToAsyncEnumerable();
114+
}
115+
finally
116+
{
117+
// ✅ Manually dispose the processor to clean up resources
118+
await processor.DisposeAsync();
119+
}
120+
}
121+
122+
/// <summary>
123+
/// Example of streaming results while maintaining proper disposal.
124+
/// This shows how to process results as they arrive.
125+
/// </summary>
126+
private static async Task StreamingWithProperDisposalAsync(int[] input, CancellationToken token)
127+
{
128+
await using var processor = input.SelectAsync(static v => TransformAsync(v), token).ProcessInParallel();
129+
130+
var processedCount = 0;
131+
await foreach (var result in processor.GetResultsAsyncEnumerable())
132+
{
133+
processedCount++;
134+
Console.WriteLine($" Received result {processedCount}: {result}");
135+
}
136+
137+
// Processor automatically disposed here
138+
}
139+
140+
/// <summary>
141+
/// Simulates an async transformation operation
142+
/// </summary>
143+
private static async Task<int> TransformAsync(int value)
144+
{
145+
// Simulate some async work
146+
await Task.Delay(50);
147+
return value * 10;
148+
}
149+
150+
/// <summary>
151+
/// Generates an async enumerable for testing
152+
/// </summary>
153+
private static async IAsyncEnumerable<int> GenerateAsyncEnumerable(
154+
int count,
155+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
156+
{
157+
for (int i = 1; i <= count; i++)
158+
{
159+
await Task.Yield();
160+
cancellationToken.ThrowIfCancellationRequested();
161+
yield return i;
162+
}
163+
}
164+
}
165+
#endif

EnumerableAsyncProcessor.Example/ProcessInParallelExample.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,30 @@ public static async Task RunExample()
7373
Console.WriteLine($"Sequential processing time: {sequentialTime.TotalMilliseconds:F0}ms");
7474
Console.WriteLine($"Parallel processing time: {parallelTime.TotalMilliseconds:F0}ms");
7575
Console.WriteLine($"Speedup: {sequentialTime.TotalMilliseconds / parallelTime.TotalMilliseconds:F1}x");
76+
77+
// Example 5: Proper disposal with builder pattern
78+
Console.WriteLine("\nExample 5: Proper disposal when using builder pattern");
79+
var data = Enumerable.Range(1, 10).ToArray();
80+
81+
// Using await using for automatic disposal
82+
await using var processor = data
83+
.SelectAsync(async item =>
84+
{
85+
await Task.Delay(50);
86+
return item * 3;
87+
}, CancellationToken.None)
88+
.ProcessInParallel(maxConcurrency: 3);
89+
90+
// Process results as they become available
91+
var processedCount = 0;
92+
await foreach (var result in processor.GetResultsAsyncEnumerable())
93+
{
94+
processedCount++;
95+
Console.WriteLine($"Processed item {processedCount}: {result}");
96+
}
97+
98+
Console.WriteLine($"All {processedCount} items processed with proper disposal");
99+
// Processor is automatically disposed here due to 'await using'
76100
}
77101

78102
private static async IAsyncEnumerable<int> GenerateAsyncEnumerable(

EnumerableAsyncProcessor.Example/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,8 @@ Task<HttpResponseMessage> PingAsync()
7474
// Run ProcessInParallel examples
7575
Console.WriteLine("\n\n=== Running ProcessInParallel Extension Examples ===\n");
7676
await ProcessInParallelExample.RunExample();
77+
78+
// Run disposal pattern examples
79+
Console.WriteLine("\n\n=== Running Disposal Pattern Examples ===\n");
80+
await DisposalExample.RunExamples();
7781
#endif

EnumerableAsyncProcessor/Builders/ItemActionAsyncProcessorBuilder_2.cs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,47 @@ internal ItemActionAsyncProcessorBuilder(IEnumerable<TInput> items, Func<TInput,
1717
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1818
}
1919

20+
/// <summary>
21+
/// Processes items in batches of the specified size.
22+
/// </summary>
23+
/// <param name="batchSize">The number of items to process in each batch.</param>
24+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
25+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
26+
/// <remarks>
27+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
28+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
29+
/// </remarks>
2030
public IAsyncProcessor<TOutput> ProcessInBatches(int batchSize)
2131
{
2232
return new ResultBatchAsyncProcessor<TInput, TOutput>(batchSize, _items, _taskSelector, _cancellationTokenSource).StartProcessing();
2333
}
2434

35+
/// <summary>
36+
/// Processes items in parallel with the specified level of parallelism.
37+
/// </summary>
38+
/// <param name="levelOfParallelism">The maximum number of concurrent operations.</param>
39+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
40+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
41+
/// <remarks>
42+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
43+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
44+
/// </remarks>
2545
public IAsyncProcessor<TOutput> ProcessInParallel(int levelOfParallelism)
2646
{
2747
return new ResultRateLimitedParallelAsyncProcessor<TInput, TOutput>(_items, _taskSelector, levelOfParallelism, _cancellationTokenSource).StartProcessing();
2848
}
2949

50+
/// <summary>
51+
/// Processes items in parallel with the specified level of parallelism and time constraints.
52+
/// </summary>
53+
/// <param name="levelOfParallelism">The maximum number of concurrent operations.</param>
54+
/// <param name="timeSpan">The time span constraint for rate limiting.</param>
55+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
56+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
57+
/// <remarks>
58+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
59+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
60+
/// </remarks>
3061
public IAsyncProcessor<TOutput> ProcessInParallel(int levelOfParallelism, TimeSpan timeSpan)
3162
{
3263
return new ResultTimedRateLimitedParallelAsyncProcessor<TInput, TOutput>(_items, _taskSelector, levelOfParallelism, timeSpan, _cancellationTokenSource).StartProcessing();
@@ -35,7 +66,12 @@ public IAsyncProcessor<TOutput> ProcessInParallel(int levelOfParallelism, TimeSp
3566
/// <summary>
3667
/// Process items in parallel without concurrency limits and return results.
3768
/// </summary>
38-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
69+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
70+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
71+
/// <remarks>
72+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
73+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
74+
/// </remarks>
3975
public IAsyncProcessor<TOutput> ProcessInParallel()
4076
{
4177
return ProcessInParallel(null, false);
@@ -45,7 +81,12 @@ public IAsyncProcessor<TOutput> ProcessInParallel()
4581
/// Process items in parallel without concurrency limits and return results.
4682
/// </summary>
4783
/// <param name="scheduleOnThreadPool">If true, schedules tasks on thread pool to prevent blocking. Default is false for maximum performance.</param>
48-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
84+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
85+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
86+
/// <remarks>
87+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
88+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
89+
/// </remarks>
4990
public IAsyncProcessor<TOutput> ProcessInParallel(bool scheduleOnThreadPool)
5091
{
5192
return ProcessInParallel(null, scheduleOnThreadPool);
@@ -55,7 +96,12 @@ public IAsyncProcessor<TOutput> ProcessInParallel(bool scheduleOnThreadPool)
5596
/// Process items in parallel with specified concurrency limit and return results.
5697
/// </summary>
5798
/// <param name="maxConcurrency">Maximum concurrent operations.</param>
58-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
99+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
100+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
101+
/// <remarks>
102+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
103+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
104+
/// </remarks>
59105
public IAsyncProcessor<TOutput> ProcessInParallel(int? maxConcurrency)
60106
{
61107
return ProcessInParallel(maxConcurrency, false);
@@ -66,12 +112,26 @@ public IAsyncProcessor<TOutput> ProcessInParallel(int? maxConcurrency)
66112
/// </summary>
67113
/// <param name="maxConcurrency">Maximum concurrent operations.</param>
68114
/// <param name="scheduleOnThreadPool">If true, schedules tasks on thread pool to prevent blocking.</param>
69-
/// <returns>An async processor configured for parallel execution that returns results.</returns>
115+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
116+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
117+
/// <remarks>
118+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
119+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
120+
/// </remarks>
70121
public IAsyncProcessor<TOutput> ProcessInParallel(int? maxConcurrency, bool scheduleOnThreadPool)
71122
{
72123
return new ResultParallelAsyncProcessor<TInput, TOutput>(_items, _taskSelector, _cancellationTokenSource, maxConcurrency, scheduleOnThreadPool).StartProcessing();
73124
}
74125

126+
/// <summary>
127+
/// Process items one at a time sequentially.
128+
/// </summary>
129+
/// <returns>An async processor that implements IDisposable and IAsyncDisposable.
130+
/// Use 'await using' or proper disposal to ensure resources are cleaned up.</returns>
131+
/// <remarks>
132+
/// The returned processor should be disposed to ensure proper cleanup of internal resources
133+
/// and cancellation of running tasks. Use 'await using var processor = ...' for automatic disposal.
134+
/// </remarks>
75135
public IAsyncProcessor<TOutput> ProcessOneAtATime()
76136
{
77137
return new ResultOneAtATimeAsyncProcessor<TInput, TOutput>(_items, _taskSelector, _cancellationTokenSource).StartProcessing();

EnumerableAsyncProcessor/Extensions/EnumerableExtensions.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,37 @@ public static ItemAsyncProcessorBuilder<T> ToAsyncProcessorBuilder<T>(this IEnum
99
return new ItemAsyncProcessorBuilder<T>(items);
1010
}
1111

12+
/// <summary>
13+
/// Creates an async processor builder that can transform items and return results.
14+
/// </summary>
15+
/// <typeparam name="T">The input item type.</typeparam>
16+
/// <typeparam name="TOutput">The output result type.</typeparam>
17+
/// <param name="items">The items to process.</param>
18+
/// <param name="taskSelector">The async transformation function.</param>
19+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
20+
/// <returns>A builder that can be configured with processing options like ProcessInParallel().</returns>
21+
/// <remarks>
22+
/// The processors created by this builder implement IDisposable/IAsyncDisposable and should be properly disposed.
23+
/// Use 'await using var processor = items.SelectAsync(...).ProcessInParallel();' for automatic disposal.
24+
/// </remarks>
1225
public static ItemActionAsyncProcessorBuilder<T, TOutput> SelectAsync<T, TOutput>(this IEnumerable<T> items, Func<T, Task<TOutput>> taskSelector, CancellationToken cancellationToken = default)
1326
{
1427
return items.ToAsyncProcessorBuilder()
1528
.SelectAsync(taskSelector, cancellationToken);
1629
}
1730

31+
/// <summary>
32+
/// Creates an async processor builder for operations that don't return results.
33+
/// </summary>
34+
/// <typeparam name="T">The input item type.</typeparam>
35+
/// <param name="items">The items to process.</param>
36+
/// <param name="taskSelector">The async operation to perform on each item.</param>
37+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
38+
/// <returns>A builder that can be configured with processing options like ProcessInParallel().</returns>
39+
/// <remarks>
40+
/// The processors created by this builder implement IDisposable/IAsyncDisposable and should be properly disposed.
41+
/// Use 'await using var processor = items.ForEachAsync(...).ProcessInParallel();' for automatic disposal.
42+
/// </remarks>
1843
public static ItemActionAsyncProcessorBuilder<T> ForEachAsync<T>(this IEnumerable<T> items, Func<T, Task> taskSelector, CancellationToken cancellationToken = default)
1944
{
2045
return items.ToAsyncProcessorBuilder()

0 commit comments

Comments
 (0)