Skip to content

Commit 71b14dc

Browse files
committed
Added IServiceScopeFactory to threading.
1 parent 8a2fb25 commit 71b14dc

12 files changed

Lines changed: 121 additions & 22 deletions

Shuttle.Core.Threading.Tests/MockProcessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public MockProcessor(TimeSpan executionDuration)
1515

1616
public int ExecutionCount { get; private set; }
1717

18-
public async Task ExecuteAsync(IProcessorThreadContext processorThreadContext, CancellationToken cancellationToken)
18+
public async Task ExecuteAsync(IProcessorThreadContext context, CancellationToken cancellationToken)
1919
{
2020
await Task.Delay(_executionDuration, cancellationToken).ConfigureAwait(false);
2121
ExecutionCount++;

Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Moq;
46
using NUnit.Framework;
57

68
namespace Shuttle.Core.Threading.Tests;
@@ -12,9 +14,13 @@ public async Task Should_be_able_to_execute_processor_thread_async()
1214
{
1315
const int minimumExecutionCount = 5;
1416

17+
var serviceScopeFactory = new Mock<IServiceScopeFactory>();
18+
19+
serviceScopeFactory.Setup(m => m.CreateScope()).Returns(new Mock<IServiceScope>().Object);
20+
1521
var executionDuration = TimeSpan.FromMilliseconds(200);
1622
var mockProcessor = new MockProcessor(executionDuration);
17-
var processorThread = new ProcessorThread("thread", mockProcessor, new());
23+
var processorThread = new ProcessorThread("thread", serviceScopeFactory.Object, mockProcessor, new());
1824
var cancellationTokenSource = new CancellationTokenSource();
1925
var cancellationToken = cancellationTokenSource.Token;
2026

Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Linq;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection;
56
using Moq;
67
using NUnit.Framework;
78

@@ -14,14 +15,18 @@ public async Task Should_be_able_to_execute_processor_thread_pool_async()
1415
{
1516
const int minimumExecutionCount = 5;
1617

18+
var serviceScopeFactory = new Mock<IServiceScopeFactory>();
19+
20+
serviceScopeFactory.Setup(m => m.CreateScope()).Returns(new Mock<IServiceScope>().Object);
21+
1722
var executionDuration = TimeSpan.FromMilliseconds(500);
1823
var cancellationTokenSource = new CancellationTokenSource();
1924
var cancellationToken = cancellationTokenSource.Token;
2025
var processorFactory = new Mock<IProcessorFactory>();
2126

2227
processorFactory.Setup(m => m.Create()).Returns(() => new MockProcessor(executionDuration));
2328

24-
var processorThreadPool = new ProcessorThreadPool("thread-pool", 5, processorFactory.Object, new());
29+
var processorThreadPool = new ProcessorThreadPool("thread-pool", 5, serviceScopeFactory.Object, processorFactory.Object, new());
2530

2631
processorThreadPool.ProcessorThreadCreated += (_, args) =>
2732
{

Shuttle.Core.Threading/IProcessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ namespace Shuttle.Core.Threading;
55

66
public interface IProcessor
77
{
8-
Task ExecuteAsync(IProcessorThreadContext processorThreadContext, CancellationToken cancellationToken = default);
8+
Task ExecuteAsync(IProcessorThreadContext processorThread, CancellationToken cancellationToken = default);
99
}
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
13
namespace Shuttle.Core.Threading;
24

35
public interface IProcessorThreadContext
46
{
5-
object? GetState(string key);
7+
IState State { get; }
8+
IServiceScope ServiceScope { get; }
69
}

Shuttle.Core.Threading/IState.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace Shuttle.Core.Threading;
2+
3+
public interface IState
4+
{
5+
void Add(string key, object? value);
6+
void Clear();
7+
bool Contains(string key);
8+
object? Get(string key);
9+
bool Remove(string key);
10+
void Replace(string key, object? value);
11+
}

Shuttle.Core.Threading/ProcessorThread.cs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
using System.Collections.Generic;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection;
56
using Shuttle.Core.Contract;
67
using Shuttle.Core.Reflection;
78

89
namespace Shuttle.Core.Threading;
910

10-
public class ProcessorThread : IProcessorThreadContext
11+
public class ProcessorThread
1112
{
13+
private readonly IServiceScopeFactory _serviceScopeFactory;
1214
private readonly CancellationTokenSource _cancellationTokenSource = new();
1315
private readonly ProcessorThreadOptions _processorThreadOptions;
1416

@@ -18,8 +20,9 @@ public class ProcessorThread : IProcessorThreadContext
1820
private bool _started;
1921
private readonly Thread _thread;
2022

21-
public ProcessorThread(string name, IProcessor processor, ProcessorThreadOptions processorThreadOptions)
23+
public ProcessorThread(string name, IServiceScopeFactory serviceScopeFactory, IProcessor processor, ProcessorThreadOptions processorThreadOptions)
2224
{
25+
_serviceScopeFactory = Guard.AgainstNull(serviceScopeFactory);
2326
Name = Guard.AgainstNull(name);
2427
Processor = Guard.AgainstNull(processor);
2528
_processorThreadOptions = Guard.AgainstNull(processorThreadOptions);
@@ -45,12 +48,7 @@ internal void Deactivate()
4548
_cancellationTokenSource.Cancel();
4649
}
4750

48-
public object? GetState(string key)
49-
{
50-
Guard.AgainstNullOrEmptyString(key);
51-
52-
return _state.TryGetValue(key, out var value) ? value : null;
53-
}
51+
public IState State { get; } = new State();
5452

5553
public event EventHandler<ProcessorThreadExceptionEventArgs>? ProcessorException;
5654
public event EventHandler<ProcessorThreadEventArgs>? ProcessorExecuting;
@@ -60,11 +58,6 @@ internal void Deactivate()
6058
public event EventHandler<ProcessorThreadEventArgs>? ProcessorThreadStopped;
6159
public event EventHandler<ProcessorThreadEventArgs>? ProcessorThreadStopping;
6260

63-
public void SetState(string key, object value)
64-
{
65-
_state[Guard.AgainstNullOrEmptyString(key)] = value;
66-
}
67-
6861
public async Task StartAsync()
6962
{
7063
if (_started)
@@ -126,7 +119,10 @@ private async void Work()
126119

127120
try
128121
{
129-
await Processor.ExecuteAsync(this, CancellationToken);
122+
using (var context = new ProcessorThreadContext(State, _serviceScopeFactory.CreateScope()))
123+
{
124+
await Processor.ExecuteAsync(context, CancellationToken);
125+
}
130126
}
131127
catch (OperationCanceledException)
132128
{

Shuttle.Core.Threading/ProcessorThreadPool.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Threading.Tasks;
4+
using Microsoft.Extensions.DependencyInjection;
45
using Shuttle.Core.Contract;
56
using Shuttle.Core.Reflection;
67

@@ -9,17 +10,20 @@ namespace Shuttle.Core.Threading;
910
public class ProcessorThreadPool : IProcessorThreadPool
1011
{
1112
private readonly List<ProcessorThread> _processorThreads = new();
13+
private readonly IServiceScopeFactory _serviceScopeFactory;
1214
private bool _disposed;
1315
private bool _started;
1416

15-
public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions)
17+
public ProcessorThreadPool(string name, int threadCount, IServiceScopeFactory serviceScopeFactory, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions)
1618
{
1719
if (threadCount < 1)
1820
{
1921
throw new ThreadCountZeroException();
2022
}
2123

24+
2225
Name = name;
26+
_serviceScopeFactory = Guard.AgainstNull(serviceScopeFactory);
2327
ProcessorFactory = Guard.AgainstNull(processorFactory);
2428
ThreadOptions = Guard.AgainstNull(processorThreadOptions);
2529
ThreadCount = threadCount;
@@ -60,7 +64,7 @@ public async Task StartAsync()
6064

6165
while (i++ < ThreadCount)
6266
{
63-
var processorThread = new ProcessorThread($"{Name} / {i}", ProcessorFactory.Create(), ThreadOptions);
67+
var processorThread = new ProcessorThread($"{Name} / {i}", _serviceScopeFactory, ProcessorFactory.Create(), ThreadOptions);
6468

6569
ProcessorThreadCreated?.Invoke(this, new(processorThread));
6670

Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Shuttle.Core.Contract;
24

35
namespace Shuttle.Core.Threading;
46

57
public class ProcessorThreadPoolFactory : IProcessorThreadPoolFactory
68
{
9+
private readonly IServiceScopeFactory _serviceScopeFactory;
10+
11+
public ProcessorThreadPoolFactory(IServiceScopeFactory serviceScopeFactory)
12+
{
13+
_serviceScopeFactory = Guard.AgainstNull(serviceScopeFactory);
14+
}
15+
716
public event EventHandler<ProcessorThreadPoolCreatedEventArgs>? ProcessorThreadPoolCreated;
817

918
public IProcessorThreadPool Create(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions)
1019
{
11-
var result = new ProcessorThreadPool(name, threadCount, processorFactory, processorThreadOptions);
20+
var result = new ProcessorThreadPool(name, threadCount, _serviceScopeFactory, processorFactory, processorThreadOptions);
1221

1322
ProcessorThreadPoolCreated?.Invoke(this, new(result));
1423

Shuttle.Core.Threading/Shuttle.Core.Threading.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
<ItemGroup>
1818
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
19+
<PackageReference Include="Moq" Version="4.20.72" />
1920
<PackageReference Include="Shuttle.Core.Contract" Version="20.0.0" />
2021
<PackageReference Include="Shuttle.Core.Reflection" Version="20.0.0" />
2122
</ItemGroup>

0 commit comments

Comments
 (0)