Skip to content

Commit 6c7cd0c

Browse files
committed
🎉 feat(Actor): 更新 OnActivateAsync 和 OnDeactivateAsync 方法以支持取消令牌
🔧 chore(project): 更新项目引用和包版本以适应新结构 📦 refactor(Consumer): 调整流订阅和获取逻辑以确保正确性 ✨ improve(Actor): 优化事务处理逻辑以提高性能 🔄 fix(Actor): 修复依赖注入中流提供者的获取方式
1 parent 65ce9a8 commit 6c7cd0c

14 files changed

Lines changed: 57 additions & 39 deletions

File tree

Packages.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
<PackageReference Update="Microsoft.Orleans.OrleansProviders" Version="8.2.0"/>
1717
<PackageReference Update="Microsoft.Orleans.Core" Version="8.2.0" />
1818
<PackageReference Update="Microsoft.Orleans.Sdk" Version="8.2.0" />
19+
<PackageReference Update="Microsoft.Orleans.Streaming" Version="8.2.0" />
1920
<PackageReference Update="Microsoft.Orleans.Core.Abstractions" Version="8.2.0" />
2021
<PackageReference Update="Microsoft.Orleans.Runtime.Abstractions" Version="8.2.0" />
2122
<PackageReference Update="Microsoft.Orleans.OrleansRuntime" Version="8.2.0" />
2223
<PackageReference Update="Microsoft.Orleans.TestingHost" Version="8.2.0" />
24+
<PackageReference Update="Microsoft.Orleans.Reminders" Version="8.2.0" />
2325

2426
<PackageReference Update="System.Buffers" Version="4.5.1" />
2527
<PackageReference Update="System.Runtime.Loader" Version="4.3.0" />
Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1-
<?xml version="1.0" encoding="utf-8"?>
2-
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3-
<Import Project="..\Packages.props" />
4-
<ItemGroup>
5-
<PackageReference Update="Microsoft.Extensions.Hosting" Version="8.0.1" />
6-
<PackageReference Update="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
7-
<PackageReference Update="MySqlConnector" Version="2.2.0" />
8-
<PackageReference Update="Npgsql" Version="7.0.0" />
9-
<PackageReference Update="Microsoft.Data.SqlClient" Version="5.0.0" />
10-
</ItemGroup>
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<Sdk Name="Microsoft.Build.CentralPackageVersions" />
4+
<PropertyGroup>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<IsPackable>false</IsPackable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="Microsoft.Orleans.Sdk" />
11+
</ItemGroup>
12+
13+
<ItemGroup>
14+
<ProjectReference Include="..\..\src\Vertex.Abstractions\Vertex.Abstractions.csproj" />
15+
<ProjectReference Include="..\..\src\Vertex.Transaction.Abstractions\Vertex.Transaction.Abstractions.csproj" />
16+
</ItemGroup>
17+
1118
</Project>
19+

src/Stream/Vertex.Stream.InMemory/Consumer/ConsumerManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
using System.Linq;
55
using System.Threading;
66
using System.Threading.Tasks;
7+
using Microsoft.Extensions.DependencyInjection;
78
using Microsoft.Extensions.Hosting;
89
using Microsoft.Extensions.Logging;
910
using Microsoft.Extensions.Options;
1011
using Orleans;
11-
using Orleans.Runtime;
1212
using Orleans.Streams;
1313
using Vertex.Abstractions.Actor;
1414
using Vertex.Abstractions.InnerService;
@@ -124,7 +124,7 @@ private async Task DistributedStart()
124124
{
125125
try
126126
{
127-
var streamProvider = this.provider.GetRequiredServiceByName<IStreamProvider>(this.streamOptions.ProviderName);
127+
var streamProvider = this.provider.GetRequiredKeyedService<IStreamProvider>(this.streamOptions.ProviderName);
128128
if (Interlocked.CompareExchange(ref this.distributedMonitorTimeLock, 1, 0) == 0)
129129
{
130130
foreach (var queue in this.queues)

src/Stream/Vertex.Stream.InMemory/Consumer/ConsumerRunner.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public async Task Run()
4747
await this.streamSubscriptionHandle.UnsubscribeAsync();
4848
}
4949
this.streamId = await this.grainFactory.GetGrain<IStreamIdActor>(0).GetId(this.Queue.Topic);
50-
var stream = this.streamProvider.GetStream<byte[]>(this.streamId, this.Queue.Name);
50+
var stream = this.streamProvider.GetStream<byte[]>(this.Queue.Name, this.streamId);
5151
this.streamSubscriptionHandle = await stream.SubscribeAsync(async bytesList => await this.Notice(bytesList.Select(o => new BytesBox(o.Item, default)).ToList()));
5252
}
5353

src/Stream/Vertex.Stream.InMemory/EventStreamFactory.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Concurrent;
33
using System.Linq;
44
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection;
56
using Microsoft.Extensions.Options;
67
using Orleans;
78
using Orleans.Runtime;
@@ -62,9 +63,9 @@ public async ValueTask<IEventStream> Create<TPrimaryKey>(IActor<TPrimaryKey> act
6263
var streamId = await this.grainFactory.GetGrain<IStreamIdActor>(0).GetId(stream);
6364
var result = this.streamDict.GetOrAdd(stream, key =>
6465
{
65-
var streamProvider = this.serviceProvider.GetRequiredServiceByName<IStreamProvider>(this.streamOptions.ProviderName);
66+
var streamProvider = this.serviceProvider.GetRequiredKeyedService<IStreamProvider>(this.streamOptions.ProviderName);
6667

67-
return new EventStream(streamProvider.GetStream<byte[]>(streamId, attribute.Name));
68+
return new EventStream(streamProvider.GetStream<byte[]>(attribute.Name, streamId));
6869
});
6970
return result;
7071
}

src/Stream/Vertex.Stream.InMemory/Vertex.Stream.InMemory.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
<ItemGroup>
44
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
55
<PackageReference Include="Microsoft.Extensions.Options" />
6-
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" />
7-
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" />
6+
<PackageReference Include="Microsoft.Orleans.Sdk" />
7+
<PackageReference Include="Microsoft.Orleans.Streaming" />
88
</ItemGroup>
99

1010
<ItemGroup>

src/Vertex.Runtime/Actor/ActorBase.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ public ActorBase()
1919
/// </summary>
2020
public TPrimaryKey ActorId { get; private set; }
2121

22-
public Task OnActivateAsync()
23-
{
24-
return this.OnActivateAsync(CancellationToken.None);
25-
}
26-
2722
/// <summary>
2823
/// Gets the real Type of the current Grain.
2924
/// </summary>
3025
protected Type ActorType { get; }
3126

27+
public Task OnActivateAsync()
28+
{
29+
return this.OnActivateAsync(CancellationToken.None);
30+
}
31+
3232
public override Task OnActivateAsync(CancellationToken cancellationToken)
3333
{
3434
var type = typeof(TPrimaryKey);

src/Vertex.Runtime/Actor/FlowActor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ protected virtual async ValueTask DependencyInjection()
366366
var snapshotStorageFactory = this.ServiceProvider.GetService<ISubSnapshotStorageFactory>();
367367
this.SnapshotStorage = await snapshotStorageFactory.Create(this);
368368
}
369-
369+
370370
public override async Task OnActivateAsync(CancellationToken cancellationToken)
371371
{
372372
await base.OnActivateAsync(cancellationToken);

src/Vertex.Transaction.Abstractions/Vertex.Transaction.Abstractions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<Sdk Name="Microsoft.Build.CentralPackageVersions" />
33
<ItemGroup>
4-
<PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" />
4+
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" />
55
</ItemGroup>
66

77
<ItemGroup>

src/Vertex.Transaction/Actor/DTxActor.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Linq;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using Microsoft.Extensions.DependencyInjection;
56
using Microsoft.Extensions.Logging;
@@ -36,9 +37,9 @@ protected async override ValueTask DependencyInjection()
3637
await base.DependencyInjection();
3738
}
3839

39-
public override async Task OnActivateAsync()
40+
public override async Task OnActivateAsync(CancellationToken cancellationToken)
4041
{
41-
await base.OnActivateAsync();
42+
await base.OnActivateAsync(cancellationToken);
4243
var txEvents = this.Convert(await this.TxEventStorage.GetLatest(this.ActorId, this.DtxOptions.RetainedTxEvents));
4344
if (txEvents.Count > 0)
4445
{
@@ -53,7 +54,7 @@ public override async Task OnActivateAsync()
5354

5455
if (!string.IsNullOrEmpty(this.TxSnapshot.TxId))
5556
{
56-
await this.TxBeginLock.WaitAsync();
57+
await this.TxBeginLock.WaitAsync(cancellationToken);
5758
var documents = await this.EventStorage.GetList(this.ActorId, this.TxSnapshot.TxStartVersion, this.Snapshot.Meta.Version);
5859
var waitingEvents = this.Convert(documents);
5960
foreach (var evt in waitingEvents)

0 commit comments

Comments
 (0)