Skip to content

Commit 17cc25e

Browse files
Add command line parsing and rabbitmq classic queue support (#338)
* Add command line parsing and quorum queue support * Acceptance tests * Add queueType parameter to RabbitMQ transport configuration in acceptance tests * Fix misleading assert.ignore * Refactor transport configuration to support asynchronous cleanup delegation in acceptance tests * Simplify pattern Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com> * Also cleanup consumer queues in acceptance tests * Null value handling --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com> Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>
1 parent 2053197 commit 17cc25e

24 files changed

Lines changed: 531 additions & 82 deletions

File tree

src/ServiceControl.Connector.MassTransit.AcceptanceTesting/IConfigureTransportTestExecution.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using MassTransit;
@@ -11,7 +12,7 @@ public interface IConfigureTransportTestExecution
1112
{
1213
Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfiguration endpointConfiguration, PublisherMetadata publisherMetadata);
1314

14-
void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator);
15+
Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator);
1516

16-
void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration);
17+
Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration);
1718
}

src/ServiceControl.Connector.MassTransit.AcceptanceTesting/TestAttributes/RabbitMQTestAttribute.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public void ApplyToContext(TestExecutionContext context)
1111
var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString");
1212
if (string.IsNullOrWhiteSpace(connectionString))
1313
{
14-
Assert.Ignore("Ignoring because environment variable RabbitMQConnectionString is not available");
14+
Assert.Ignore("Ignoring because environment variable RabbitMQTransport_ConnectionString is not available");
1515
}
1616
}
1717
}

src/ServiceControl.Connector.MassTransit.AcceptanceTests.AmazonSQS/ConfigureAmazonSQSTransportTestExecution.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfi
1414
return _ => Task.CompletedTask;
1515
}
1616

17-
public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
17+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
1818
{
1919
var region = FallbackRegionFactory.GetRegionEndpoint().SystemName;
2020

@@ -29,15 +29,17 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato
2929

3030
cfg.ConfigureEndpoints(context, new DefaultEndpointNameFormatter(NamePrefixGenerator.GetNamePrefix(), false));
3131
});
32+
return (_, _) => Task.CompletedTask;
3233
}
3334

34-
public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
35+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
3536
{
3637
services.UsingAmazonSqs(transport =>
3738
{
3839
transport.QueueNamePrefix = NamePrefixGenerator.GetNamePrefix();
3940
transport.TopicNamePrefix = NamePrefixGenerator.GetNamePrefix();
4041
transport.QueueNameGenerator = TestNameHelper.GetSqsQueueName;
4142
});
43+
return (_, _) => Task.CompletedTask;
4244
}
4345
}

src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBus/ConfigureAzureServiceBusTransportTestExecution.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@ public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfi
1414
return Cleanup;
1515
}
1616

17-
public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
17+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
1818
{
1919
configurator.UsingAzureServiceBus((context, cfg) =>
2020
{
2121
cfg.Host(connectionString);
2222
cfg.ConfigureEndpoints(context);
2323
});
24+
return (_, _) => Task.CompletedTask;
2425
}
2526

26-
public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
27+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
2728
{
2829
services.UsingAzureServiceBus(configuration, connectionString);
30+
return (_, _) => Task.CompletedTask;
2931
}
3032

3133
Task Cleanup(CancellationToken cancellationToken)

src/ServiceControl.Connector.MassTransit.AcceptanceTests.AzureServiceBusDeadLetter/ConfigureAzureServiceBusTransportTestExecution.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfi
1414
return Cleanup;
1515
}
1616

17-
public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
17+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
1818
{
1919
configurator.UsingAzureServiceBus((context, cfg) =>
2020
{
@@ -29,11 +29,13 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato
2929
sb.ConfigureDeadLetterQueueErrorTransport();
3030
}
3131
});
32+
return (_, _) => Task.CompletedTask;
3233
}
3334

34-
public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
35+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
3536
{
3637
services.UsingAzureServiceBus(configuration, connectionString, true);
38+
return (_, _) => Task.CompletedTask;
3739
}
3840

3941
Task Cleanup(CancellationToken cancellationToken)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[*.cs]
2+
3+
# Justification: Test project
4+
dotnet_diagnostic.CA2007.severity = none
5+
dotnet_diagnostic.PS0004.severity = none # A parameter of type CancellationToken on a private delegate or method should be required
6+
dotnet_diagnostic.PS0018.severity = none # Add a CancellationToken parameter
7+
8+
# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
9+
dotnet_diagnostic.NSB0002.severity = suggestion
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<ProjectReference Include="..\ServiceControl.Connector.MassTransit.AcceptanceTesting\ServiceControl.Connector.MassTransit.AcceptanceTesting.csproj" />
11+
<ProjectReference Include="..\ServiceControl.Connector.MassTransit.RabbitMQ\ServiceControl.Connector.MassTransit.RabbitMQ.csproj" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<PackageReference Include="GitHubActionsTestLogger" Version="3.0.1" />
16+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.3.0" />
17+
<PackageReference Include="NUnit" Version="4.5.1" />
18+
<PackageReference Include="NUnit.Analyzers" Version="4.12.0" />
19+
<PackageReference Include="NUnit3TestAdapter" Version="6.1.0" />
20+
</ItemGroup>
21+
22+
<ItemGroup>
23+
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.1" />
24+
</ItemGroup>
25+
26+
<ItemGroup>
27+
<Compile Include="..\ServiceControl.Connector.MassTransit.AcceptanceTests\Shared\**\*.cs" LinkBase="Shared" />
28+
<Compile Include="..\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ\**\*.cs" Exclude="..\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ\obj\**\*"/>
29+
<Compile Remove="..\ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ\TestSuiteConfiguration.cs" />
30+
</ItemGroup>
31+
32+
</Project>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[assembly: RabbitMQTest]
2+
3+
public partial class TestSuiteConfiguration
4+
{
5+
public IConfigureTransportTestExecution CreateTransportConfiguration() => new ConfigureRabbitMQTransportTestExecution(QueueType.Classic);
6+
public Task Cleanup() => Task.CompletedTask;
7+
}

src/ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ/ConfigureRabbitMQTransportTestExecution.cs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22
using Microsoft.Extensions.Configuration;
33
using Microsoft.Extensions.DependencyInjection;
44
using NServiceBus.AcceptanceTesting.Support;
5+
using NServiceBus.Transport;
6+
using ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ;
57

6-
class ConfigureRabbitMQTransportTestExecution : IConfigureTransportTestExecution
8+
class ConfigureRabbitMQTransportTestExecution(QueueType queueType = QueueType.Quorum) : IConfigureTransportTestExecution
79
{
10+
TestRabbitMQTransport? transport;
11+
812
public Func<CancellationToken, Task> ConfigureTransportForEndpoint(EndpointConfiguration endpointConfiguration, PublisherMetadata publisherMetadata)
913
{
10-
var transport = new RabbitMQTransport(
11-
RoutingTopology.Conventional(QueueType.Quorum), "host=localhost", false);
14+
transport = new TestRabbitMQTransport(RoutingTopology.Conventional(queueType), "host=localhost", false);
1215
endpointConfiguration.UseTransport(transport);
1316
return Cleanup;
1417
}
1518

16-
public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
19+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurator configurator)
1720
{
1821
configurator.UsingRabbitMq((context, cfg) =>
1922
{
@@ -32,19 +35,72 @@ public void ConfigureTransportForMassTransitEndpoint(IBusRegistrationConfigurato
3235
{
3336
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
3437
{
35-
rmq.SetQuorumQueue();
38+
if (queueType == QueueType.Quorum)
39+
{
40+
rmq.SetQuorumQueue();
41+
}
3642
}
3743
});
44+
45+
return (queuesToDelete, _) =>
46+
{
47+
DeleteQueues(queuesToDelete);
48+
return Task.CompletedTask;
49+
};
3850
}
3951

40-
public void ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
52+
public Func<IReadOnlyCollection<string>, CancellationToken, Task> ConfigureTransportForConnector(IServiceCollection services, IConfiguration configuration)
4153
{
42-
services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest");
54+
services.UsingRabbitMQ("host=localhost", new Uri("http://localhost:15672/"), "guest", "guest", queueType);
55+
return (queuesToDelete, _) =>
56+
{
57+
DeleteQueues(queuesToDelete);
58+
return Task.CompletedTask;
59+
};
4360
}
4461

4562
Task Cleanup(CancellationToken cancellationToken)
4663
{
47-
//TODO?
64+
PurgeQueues();
4865
return Task.CompletedTask;
4966
}
67+
68+
void PurgeQueues()
69+
{
70+
if (transport == null)
71+
{
72+
return;
73+
}
74+
75+
DeleteQueues(transport.QueuesToCleanup.ToHashSet());
76+
}
77+
78+
static void DeleteQueues(IReadOnlyCollection<string> queues)
79+
{
80+
using var connection = ConnectionHelper.ConnectionFactory.CreateConnection("Test Queue Purger");
81+
using var channel = connection.CreateModel();
82+
foreach (var queue in queues)
83+
{
84+
try
85+
{
86+
channel.QueueDelete(queue, false, false);
87+
}
88+
catch (Exception ex)
89+
{
90+
Console.WriteLine("Unable to clear queue {0}: {1}", queue, ex);
91+
}
92+
}
93+
}
94+
95+
class TestRabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) : RabbitMQTransport(routingTopology, connectionString, enableDelayedDelivery)
96+
{
97+
public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken = default)
98+
{
99+
var infrastructure = await base.Initialize(hostSettings, receivers, sendingAddresses, cancellationToken);
100+
QueuesToCleanup.AddRange(infrastructure.Receivers.Select(x => x.Value.ReceiveAddress).Concat(sendingAddresses).Distinct());
101+
return infrastructure;
102+
}
103+
104+
public List<string> QueuesToCleanup { get; } = [];
105+
}
50106
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace ServiceControl.Connector.MassTransit.AcceptanceTests.RabbitMQ;
2+
3+
using System;
4+
using System.Security.Authentication;
5+
using global::RabbitMQ.Client;
6+
7+
public static class ConnectionHelper
8+
{
9+
static Lazy<ConnectionFactory> connectionFactory = new(() =>
10+
{
11+
var connectionConfiguration = AdapterRabbitMqConfiguration.ConnectionConfiguration.Create("host=localhost", "AcceptanceTests");
12+
13+
var factory = new ConnectionFactory
14+
{
15+
AutomaticRecoveryEnabled = true,
16+
HostName = connectionConfiguration.Host,
17+
Port = connectionConfiguration.Port,
18+
VirtualHost = connectionConfiguration.VirtualHost,
19+
UserName = connectionConfiguration.UserName ?? "guest",
20+
Password = connectionConfiguration.Password ?? "guest"
21+
};
22+
23+
factory.Ssl.ServerName = factory.HostName;
24+
factory.Ssl.Certs = null;
25+
factory.Ssl.Version = SslProtocols.Tls12;
26+
factory.Ssl.Enabled = connectionConfiguration.UseTls;
27+
28+
return factory;
29+
});
30+
31+
public static ConnectionFactory ConnectionFactory => connectionFactory.Value;
32+
}

0 commit comments

Comments
 (0)