Skip to content

Commit d8bdc73

Browse files
committed
Address server log PR feedback
1 parent 6d0ef72 commit d8bdc73

7 files changed

Lines changed: 177 additions & 24 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using Elsa.Diagnostics.Models;
2+
3+
namespace Elsa.Diagnostics.Contracts;
4+
5+
public interface IServerLogStreamProvider : IServerLogProvider
6+
{
7+
IAsyncEnumerable<ServerLogStreamItem> SubscribeWithDroppedEventsAsync(ServerLogFilter filter, CancellationToken cancellationToken = default);
8+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace Elsa.Diagnostics.Models;
2+
3+
public record ServerLogStreamItem(
4+
ServerLogEvent? LogEvent = null,
5+
ServerLogDroppedEventSummary? DroppedEvents = null)
6+
{
7+
public static ServerLogStreamItem FromLogEvent(ServerLogEvent logEvent) => new(logEvent);
8+
9+
public static ServerLogStreamItem FromDroppedEvents(ServerLogDroppedEventSummary summary) => new(DroppedEvents: summary);
10+
}

src/modules/Elsa.Diagnostics/Providers/InMemory/InMemoryServerLogProvider.cs

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88

99
namespace Elsa.Diagnostics.Providers.InMemory;
1010

11-
public class InMemoryServerLogProvider : IServerLogProvider
11+
public class InMemoryServerLogProvider : IServerLogStreamProvider
1212
{
1313
private readonly RingBuffer<ServerLogEvent> _recentLogs;
1414
private readonly IServerLogSourceRegistry _sourceRegistry;
1515
private readonly ServerLogStreamingOptions _options;
1616
private readonly object _subscribersLock = new();
17-
private readonly Dictionary<Guid, Channel<ServerLogEvent>> _subscribers = new();
17+
private readonly Dictionary<Guid, ServerLogSubscriber> _subscribers = new();
1818

1919
public InMemoryServerLogProvider(IOptions<ServerLogStreamingOptions> options, IServerLogSourceRegistry sourceRegistry)
2020
{
@@ -28,12 +28,12 @@ public ValueTask PublishAsync(ServerLogEvent logEvent, CancellationToken cancell
2828
_recentLogs.Add(logEvent);
2929
_sourceRegistry.MarkSeen(logEvent.SourceId, logEvent.ReceivedAt);
3030

31-
List<Channel<ServerLogEvent>> subscribers;
31+
List<ServerLogSubscriber> subscribers;
3232
lock (_subscribersLock)
3333
subscribers = _subscribers.Values.ToList();
34-
34+
3535
foreach (var subscriber in subscribers)
36-
subscriber.Writer.TryWrite(logEvent);
36+
subscriber.TryWrite(logEvent, _options.SubscriberChannelCapacity);
3737

3838
return ValueTask.CompletedTask;
3939
}
@@ -57,23 +57,27 @@ public ValueTask<RecentServerLogsResult> GetRecentAsync(ServerLogFilter filter,
5757

5858
public async IAsyncEnumerable<ServerLogEvent> SubscribeAsync(ServerLogFilter filter, [EnumeratorCancellation] CancellationToken cancellationToken = default)
5959
{
60-
var subscriberId = Guid.NewGuid();
61-
var channel = Channel.CreateBounded<ServerLogEvent>(new BoundedChannelOptions(_options.SubscriberChannelCapacity)
60+
await foreach (var item in SubscribeWithDroppedEventsAsync(filter, cancellationToken))
6261
{
63-
FullMode = BoundedChannelFullMode.DropOldest,
64-
SingleReader = true,
65-
SingleWriter = false
66-
});
67-
62+
if (item.LogEvent != null)
63+
yield return item.LogEvent;
64+
}
65+
}
66+
67+
public async IAsyncEnumerable<ServerLogStreamItem> SubscribeWithDroppedEventsAsync(ServerLogFilter filter, [EnumeratorCancellation] CancellationToken cancellationToken = default)
68+
{
69+
var subscriberId = Guid.NewGuid();
70+
var subscriber = new ServerLogSubscriber(filter);
71+
6872
lock (_subscribersLock)
69-
_subscribers[subscriberId] = channel;
70-
73+
_subscribers[subscriberId] = subscriber;
74+
7175
try
7276
{
73-
await foreach (var logEvent in channel.Reader.ReadAllAsync(cancellationToken))
77+
await foreach (var item in subscriber.Channel.Reader.ReadAllAsync(cancellationToken))
7478
{
75-
if (ServerLogFilterEvaluator.Matches(logEvent, filter))
76-
yield return logEvent;
79+
subscriber.MarkConsumed(item);
80+
yield return item;
7781
}
7882
}
7983
finally
@@ -87,4 +91,63 @@ public ValueTask<IReadOnlyCollection<ServerLogSource>> ListSourcesAsync(Cancella
8791
{
8892
return ValueTask.FromResult(_sourceRegistry.List());
8993
}
94+
95+
private sealed class ServerLogSubscriber(ServerLogFilter filter)
96+
{
97+
private readonly object _lock = new();
98+
private int _pendingItemCount;
99+
private long _droppedSinceLastSummary;
100+
private bool _summaryQueued;
101+
102+
public Channel<ServerLogStreamItem> Channel { get; } = System.Threading.Channels.Channel.CreateUnbounded<ServerLogStreamItem>(new UnboundedChannelOptions
103+
{
104+
SingleReader = true,
105+
SingleWriter = false
106+
});
107+
108+
public void TryWrite(ServerLogEvent logEvent, int capacity)
109+
{
110+
if (!ServerLogFilterEvaluator.Matches(logEvent, filter))
111+
return;
112+
113+
lock (_lock)
114+
{
115+
if (_pendingItemCount >= capacity)
116+
{
117+
_droppedSinceLastSummary++;
118+
QueueDroppedSummaryIfNeeded();
119+
return;
120+
}
121+
122+
Channel.Writer.TryWrite(ServerLogStreamItem.FromLogEvent(logEvent));
123+
_pendingItemCount++;
124+
}
125+
}
126+
127+
public void MarkConsumed(ServerLogStreamItem item)
128+
{
129+
lock (_lock)
130+
{
131+
_pendingItemCount = Math.Max(0, _pendingItemCount - 1);
132+
133+
if (item.DroppedEvents != null)
134+
{
135+
_summaryQueued = false;
136+
QueueDroppedSummaryIfNeeded();
137+
}
138+
}
139+
}
140+
141+
private void QueueDroppedSummaryIfNeeded()
142+
{
143+
if (_summaryQueued || _droppedSinceLastSummary == 0)
144+
return;
145+
146+
var summary = new ServerLogDroppedEventSummary(null, _droppedSinceLastSummary, "SubscriberChannelFull");
147+
_droppedSinceLastSummary = 0;
148+
_summaryQueued = true;
149+
_pendingItemCount++;
150+
Channel.Writer.TryWrite(ServerLogStreamItem.FromDroppedEvents(summary));
151+
}
152+
}
90153
}

src/modules/Elsa.Diagnostics/RealTime/ServerLogSubscriptionManager.cs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,22 @@ private async Task StreamAsync(string connectionId, ServerLogFilter filter, Serv
6666
{
6767
try
6868
{
69-
await foreach (var logEvent in _logProvider.SubscribeAsync(filter, cancellationToken))
70-
await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(logEvent, cancellationToken);
69+
if (_logProvider is IServerLogStreamProvider streamProvider)
70+
await StreamWithDroppedEventsAsync(connectionId, filter, streamProvider, cancellationToken);
71+
else
72+
await StreamLogEventsAsync(connectionId, filter, cancellationToken);
7173
}
72-
catch (OperationCanceledException)
74+
catch (OperationCanceledException e)
7375
{
76+
_logger.LogDebug(e, "Server log subscription for connection {ConnectionId} was canceled", connectionId);
7477
}
75-
catch (Exception e)
78+
catch (HubException e)
7679
{
77-
_logger.LogDebug(e, "Server log subscription for connection {ConnectionId} stopped unexpectedly", connectionId);
80+
_logger.LogWarning(e, "Server log subscription for connection {ConnectionId} stopped unexpectedly", connectionId);
81+
}
82+
catch (InvalidOperationException e)
83+
{
84+
_logger.LogWarning(e, "Server log subscription for connection {ConnectionId} stopped unexpectedly", connectionId);
7885
}
7986
finally
8087
{
@@ -102,13 +109,35 @@ private void OnSourceChanged(ServerLogSource source)
102109
_ = BroadcastSourceChangedAsync(source);
103110
}
104111

112+
private async Task StreamLogEventsAsync(string connectionId, ServerLogFilter filter, CancellationToken cancellationToken)
113+
{
114+
await foreach (var logEvent in _logProvider.SubscribeAsync(filter, cancellationToken))
115+
await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(logEvent, cancellationToken);
116+
}
117+
118+
private async Task StreamWithDroppedEventsAsync(string connectionId, ServerLogFilter filter, IServerLogStreamProvider streamProvider, CancellationToken cancellationToken)
119+
{
120+
await foreach (var item in streamProvider.SubscribeWithDroppedEventsAsync(filter, cancellationToken))
121+
{
122+
if (item.LogEvent != null)
123+
await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(item.LogEvent, cancellationToken);
124+
125+
if (item.DroppedEvents != null)
126+
await _hubContext.Clients.Client(connectionId).ReceiveDroppedEventsAsync(item.DroppedEvents, cancellationToken);
127+
}
128+
}
129+
105130
private async Task BroadcastSourceChangedAsync(ServerLogSource source)
106131
{
107132
try
108133
{
109134
await _hubContext.Clients.All.ReceiveSourceChangedAsync(source);
110135
}
111-
catch (Exception e)
136+
catch (OperationCanceledException e)
137+
{
138+
_logger.LogDebug(e, "Server log source change broadcast for source {SourceId} was canceled", source.Id);
139+
}
140+
catch (Exception e) when (e is not OperationCanceledException)
112141
{
113142
_logger.LogDebug(e, "Failed to broadcast server log source change for source {SourceId}", source.Id);
114143
}

src/modules/Elsa.Diagnostics/Services/ServerLogSourceRegistry.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ public void MarkSeen(string sourceId, DateTimeOffset timestamp)
3838
return;
3939
}
4040

41-
var source = Current with
41+
var source = new ServerLogSource
4242
{
4343
Id = sourceId,
4444
DisplayName = sourceId,
45+
MachineName = sourceId,
46+
ProcessId = 0,
4547
LastSeen = timestamp,
4648
Status = ServerLogSourceStatus.Connected
4749
};

test/unit/Elsa.Diagnostics.UnitTests/InMemory/InMemoryServerLogProviderTests.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,25 @@ public async Task SubscribeAsync_YieldsOnlyMatchingLiveEvents()
6161
Assert.Equal(2, subscription.Current.Sequence);
6262
}
6363

64+
[Fact]
65+
public async Task SubscribeWithDroppedEventsAsync_WhenSubscriberCapacityIsReached_YieldsDroppedSummary()
66+
{
67+
_options.SubscriberChannelCapacity = 1;
68+
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
69+
await using var subscription = _provider.SubscribeWithDroppedEventsAsync(new(), cancellationTokenSource.Token).GetAsyncEnumerator();
70+
var next = subscription.MoveNextAsync().AsTask();
71+
72+
await _provider.PublishAsync(CreateLog(1, ServerLogLevel.Information));
73+
await _provider.PublishAsync(CreateLog(2, ServerLogLevel.Information));
74+
75+
Assert.True(await next.WaitAsync(TimeSpan.FromSeconds(5)));
76+
Assert.Equal(1, subscription.Current.LogEvent!.Sequence);
77+
78+
Assert.True(await subscription.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(5)));
79+
Assert.Equal(1, subscription.Current.DroppedEvents!.DroppedCount);
80+
Assert.Equal("SubscriberChannelFull", subscription.Current.DroppedEvents.Reason);
81+
}
82+
6483
private static ServerLogEvent CreateLog(long sequence, ServerLogLevel level, string category = "Elsa", string sourceId = "source-a") =>
6584
new()
6685
{

test/unit/Elsa.Diagnostics.UnitTests/Sources/ServerLogSourceRegistryTests.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,28 @@ public void MarkSeen_WhenSourceIsUnknown_AddsSourceWithMatchingId()
5252
Assert.Equal(ServerLogSourceStatus.Connected, source.Status);
5353
}
5454

55+
[Fact]
56+
public void MarkSeen_WhenSourceIsUnknown_DoesNotCopyLocalContainerMetadata()
57+
{
58+
SetEnvironment("HOSTNAME", "local-pod");
59+
SetEnvironment("OTEL_SERVICE_NAME", "local-service");
60+
SetEnvironment("POD_NAMESPACE", "local-namespace");
61+
SetEnvironment("CONTAINER_NAME", "local-container");
62+
SetEnvironment("NODE_NAME", "local-node");
63+
var registry = CreateRegistry();
64+
65+
registry.MarkSeen("pod-b", DateTimeOffset.UtcNow);
66+
67+
var source = Assert.Single(registry.List(), x => x.Id == "pod-b");
68+
Assert.Equal("pod-b", source.MachineName);
69+
Assert.Equal(0, source.ProcessId);
70+
Assert.Null(source.ServiceName);
71+
Assert.Null(source.PodName);
72+
Assert.Null(source.Namespace);
73+
Assert.Null(source.ContainerName);
74+
Assert.Null(source.NodeName);
75+
}
76+
5577
[Fact]
5678
public void MarkSeen_WhenSourceIsUnknown_RaisesSourceChanged()
5779
{

0 commit comments

Comments
 (0)