Skip to content

Commit 3ffd2ff

Browse files
committed
working to abstract away the rabbitmq
1 parent bdf3492 commit 3ffd2ff

10 files changed

Lines changed: 138 additions & 39 deletions

File tree

src/AuditlogService/MessageQueueService.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,15 @@
33
using Microsoft.Extensions.Options;
44
using MassTransit.RabbitMqTransport;
55
using MessageContracts;
6+
using System;
7+
using MassTransitAbstractions.Extensions;
68

79
namespace AuditlogService
810
{
9-
public class MessageQueueService : MessageQueueServiceBase
11+
12+
public class OrderServiceEndpointRegistration : IMyRabbitMQReceiveEndpointRegistration
1013
{
11-
public MessageQueueService(IOptions<MassTransitOptions> options) : base(options)
12-
{
13-
14-
}
15-
protected override void OnAddReceiveEndpoint(IRabbitMqBusFactoryConfigurator cfg, IRabbitMqHost host)
14+
public void AddReceiveEndpoint(IRabbitMqBusFactoryConfigurator cfg, IRabbitMqHost host)
1615
{
1716
cfg.ReceiveEndpoint(host, "order-service", e =>
1817
{
@@ -23,4 +22,12 @@ protected override void OnAddReceiveEndpoint(IRabbitMqBusFactoryConfigurator cfg
2322
});
2423
}
2524
}
25+
public class MessageQueueService : MessageQueueServiceBase
26+
{
27+
private IMyRabbitMQContainer _myRabbitMQContainer;
28+
public MessageQueueService(IMyRabbitMQContainer myRabbitMQContainer) : base(myRabbitMQContainer)
29+
{
30+
_myRabbitMQContainer = myRabbitMQContainer;
31+
}
32+
}
2633
}

src/AuditlogService/Startup.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,16 @@
1313
using Serilog;
1414
using MassTransitAbstractions.Extensions;
1515
using GQL.GraphQLHost.Core;
16+
using MassTransitAbstractions;
1617

1718
namespace AuditlogService
1819
{
20+
public interface IMyRabbitMQReceiveEndpointRegistration : IRabbitMQReceiveEndpointRegistration { }
21+
public interface IMyRabbitMQContainer : IRabbitMQContainer { }
22+
23+
public class MyRabbitMQContainer : RabbitMQContainerBase, IMyRabbitMQContainer
24+
{
25+
}
1926
public class Startup : GraphQLRollupStartup<Startup>
2027
{
2128
public Startup(IHostingEnvironment env, IConfiguration configuration, ILogger<Startup> logger) :
@@ -45,7 +52,14 @@ protected override void AddHealthChecks(HealthCheckBuilder checks)
4552

4653
protected override void AddAdditionalServices(IServiceCollection services)
4754
{
48-
services.AddMassTransitOptions(Configuration.GetSection("MassTransitOptions"));
55+
services.AddTransient<IMyRabbitMQReceiveEndpointRegistration, OrderServiceEndpointRegistration>();
56+
services.AddRabbitMQMassTransit<
57+
MassTransitOptions,
58+
IMyRabbitMQContainer,
59+
MyRabbitMQContainer, IMyRabbitMQReceiveEndpointRegistration>();
60+
61+
services.AddRabbitMqMassTransitOptions<MassTransitOptions>(Configuration.GetSection("MassTransitOptions"));
62+
4963
services.AddHostedService<MessageQueueService>();
5064

5165
var configSection = Configuration.GetSection("RabbitMQ");

src/AuditlogService/appsettings.Development.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"MassTransitOptions": {
3+
"Host": "localhost",
34
"TransportType": "RabbitMQ",
45
"Username": "rabbitmquser",
56
"Password": "DEBmbwkSrzy9D1T9cJfa"

src/AuditlogService/appsettings.Production.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"MassTransitOptions": {
3+
"Host": "rabbitmq",
34
"TransportType": "RabbitMQ",
45
"Username": "rabbitmquser",
56
"Password": "DEBmbwkSrzy9D1T9cJfa"

src/CustomerManagementAPI.Host/Startup.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@
1313
using Serilog;
1414
using MassTransitAbstractions.Extensions;
1515
using GQL.GraphQLHost.Core;
16+
using MassTransit.RabbitMqTransport;
17+
using MassTransitAbstractions;
1618

1719
namespace CustomerManagementAPI.Host
1820
{
21+
public interface IMyRabbitMQReceiveEndpointRegistration : IRabbitMQReceiveEndpointRegistration { }
22+
public interface IMyRabbitMQContainer : IRabbitMQContainer { }
23+
24+
public class MyRabbitMQContainer : RabbitMQContainerBase, IMyRabbitMQContainer
25+
{
26+
}
1927
public class Startup : GraphQLRollupStartup<Startup>
2028
{
2129
public Startup(IHostingEnvironment env, IConfiguration configuration, ILogger<Startup> logger):
@@ -47,7 +55,12 @@ protected override void AddHealthChecks(HealthCheckBuilder checks)
4755

4856
protected override void AddAdditionalServices(IServiceCollection services)
4957
{
50-
services.AddMassTransitOptions(Configuration.GetSection("MassTransitOptions"));
58+
services.AddRabbitMQMassTransit<
59+
MassTransitOptions,
60+
IMyRabbitMQContainer,
61+
MyRabbitMQContainer, IMyRabbitMQReceiveEndpointRegistration>();
62+
63+
services.AddRabbitMqMassTransitOptions<MassTransitOptions>(Configuration.GetSection("MassTransitOptions"));
5164

5265
var configSection = Configuration.GetSection("RabbitMQ");
5366
string host = configSection["Host"];

src/CustomerManagementAPI.Host/appsettings.Development.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"MassTransitOptions": {
3+
"Host": "localhost",
34
"TransportType": "RabbitMQ",
45
"Username": "rabbitmquser",
56
"Password": "DEBmbwkSrzy9D1T9cJfa"

src/CustomerManagementAPI.Host/appsettings.Production.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"MassTransitOptions": {
3+
"Host": "rabbitmq",
34
"TransportType": "RabbitMQ",
45
"Username": "rabbitmquser",
56
"Password": "DEBmbwkSrzy9D1T9cJfa"
Lines changed: 85 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,99 @@
11
using System;
22
using Microsoft.Extensions.DependencyInjection;
33
using Microsoft.Extensions.Configuration;
4+
using MassTransit;
5+
using MassTransit.RabbitMqTransport;
6+
using Microsoft.Extensions.Options;
7+
using System.Collections.Generic;
8+
using System.Linq;
49

510
namespace MassTransitAbstractions.Extensions
611
{
12+
13+
public interface IRabbitMQReceiveEndpointRegistration
14+
{
15+
void AddReceiveEndpoint(IRabbitMqBusFactoryConfigurator cfg, IRabbitMqHost host);
16+
}
17+
public interface IBusControlContainer
18+
{
19+
IBusControl BusControl { get; set; }
20+
}
21+
public interface IRabbitMQContainer: IBusControlContainer
22+
{
23+
IRabbitMqHost RabbitMqHost { get; set; }
24+
25+
}
26+
public abstract class BusControlContainerBase : IBusControlContainer
27+
{
28+
public IBusControl BusControl { get; set; }
29+
public IPublishEndpoint PublishEndpoint => BusControl;
30+
public ISendEndpointProvider SendEndpointProvider => BusControl;
31+
public IBus Bus => BusControl;
32+
}
33+
public abstract class RabbitMQContainerBase : BusControlContainerBase, IRabbitMQContainer
34+
{
35+
public IRabbitMqHost RabbitMqHost { get; set; }
36+
}
737
public static class AspNetCoreServiceExtensions
838
{
9-
public static void AddMassTransitOptions(this IServiceCollection services,
10-
Action<MassTransitOptions> configureOptions)
39+
public static void AddRabbitMqMassTransitOptions<TOptions>(
40+
this IServiceCollection services,Action<MassTransitOptions> configureOptions)
41+
where TOptions : MassTransitOptions, new()
42+
{
43+
services.Configure<TOptions>(configureOptions);
44+
}
45+
public static void AddRabbitMqMassTransitOptions<TOptions>(
46+
this IServiceCollection services,IConfiguration config)
47+
where TOptions : MassTransitOptions, new()
1148
{
12-
services.Configure(configureOptions);
49+
services.Configure<TOptions>(config);
1350
}
14-
public static void AddMassTransitOptions(this IServiceCollection services,
15-
IConfiguration config)
51+
52+
53+
public static void AddRabbitMQMassTransit<TOptions, TService, TImplementation, THandlerRegistrants>(this IServiceCollection services)
54+
where TOptions : MassTransitOptions, new()
55+
where TService : class, IRabbitMQContainer
56+
where TImplementation : class, TService, new()
57+
where THandlerRegistrants : class, IRabbitMQReceiveEndpointRegistration
58+
1659
{
17-
services.Configure<MassTransitOptions>(config);
60+
services.AddSingleton<TService>((sp) => {
61+
62+
var options = sp.GetRequiredService<IOptions<TOptions>>();
63+
var opts = options.Value;
64+
IBusControl bus = null;
65+
TImplementation container = null;
66+
switch (opts.TransportType)
67+
{
68+
case MassTransitOptions.Transport.RabbitMQ:
69+
IRabbitMqHost host = null;
70+
bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
71+
{
72+
var uriString = $"rabbitmq://{opts.Host}/";
73+
host = cfg.Host(new Uri(uriString), h =>
74+
{
75+
h.Username(opts.Username);
76+
h.Password(opts.Password);
77+
});
78+
var registrants = sp.GetRequiredService<IEnumerable<THandlerRegistrants>>();
79+
if (registrants != null && registrants.Any())
80+
{
81+
foreach (var registrant in registrants)
82+
{
83+
registrant.AddReceiveEndpoint(cfg, host);
84+
}
85+
}
86+
});
87+
container = new TImplementation { BusControl = bus, RabbitMqHost = host };
88+
89+
break;
90+
default:
91+
throw new ArgumentOutOfRangeException($"MasstransitOptions.Transport:{opts.TransportType} is not supported");
92+
}
93+
// bus.Start();
94+
95+
return container;
96+
});
1897
}
1998
}
2099
}

src/MasstransitAbstractions/MassTransitOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ public enum Transport
99
public Transport TransportType { get; set; }
1010
public string Username { get; set; }
1111
public string Password { get; set; }
12-
12+
public string Host { get; set; }
1313
}
1414
}

src/MasstransitAbstractions/MessageQueueServiceBase.cs

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Threading;
33
using System.Threading.Tasks;
44
using MassTransit;
5+
using MassTransitAbstractions.Extensions;
56
using MassTransit.RabbitMqTransport;
67
using Microsoft.Extensions.Hosting;
78
using Microsoft.Extensions.Options;
@@ -10,40 +11,21 @@ namespace MassTransitAbstractions
1011
{
1112
public abstract class MessageQueueServiceBase : BackgroundService
1213
{
13-
protected abstract void OnAddReceiveEndpoint(IRabbitMqBusFactoryConfigurator cfg, IRabbitMqHost host);
14+
private IBusControlContainer _busControlContainer;
1415

15-
protected MassTransitOptions _options;
16-
protected IBusControl _bus;
17-
18-
public MessageQueueServiceBase(IOptions<MassTransitOptions> options)
16+
public MessageQueueServiceBase(IBusControlContainer busControlContainer)
1917
{
20-
_options = options.Value;
21-
switch (_options.TransportType)
22-
{
23-
case MassTransitOptions.Transport.RabbitMQ:
24-
_bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
25-
{
26-
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
27-
{
28-
h.Username(_options.Username);
29-
h.Password(_options.Password);
30-
});
31-
OnAddReceiveEndpoint(cfg, host);
32-
});
33-
break;
34-
default:
35-
throw new ArgumentOutOfRangeException($"MasstransitOptions.Transport:{_options.TransportType} is not supported");
36-
}
18+
_busControlContainer = busControlContainer;
3719

3820
}
3921
protected override Task ExecuteAsync(CancellationToken stoppingToken)
4022
{
41-
return _bus.StartAsync();
23+
return _busControlContainer.BusControl.StartAsync();
4224
}
4325

4426
public override Task StopAsync(CancellationToken cancellationToken)
4527
{
46-
return Task.WhenAll(base.StopAsync(cancellationToken), _bus.StopAsync());
28+
return Task.WhenAll(base.StopAsync(cancellationToken), _busControlContainer.BusControl.StopAsync());
4729
}
4830
}
4931
}

0 commit comments

Comments
 (0)