Skip to content

Commit 5645c18

Browse files
author
Paulo Morgado
committed
Refactor RTP handling and memory management
- Replaced `List<byte[]>` with `ReadOnlySequence<byte>` for RTP packets and raw samples, enhancing memory management and performance. - Introduced `PooledByteBuffer` class implementing `IBufferWriter<byte>` for efficient byte data writing using an array pool. - Updated `CreateRtpPackets` method signatures to return `IByteBuffer`, streamlining the API and reducing memory overhead. - Modified `FeedInRawSamples` to accept `ReadOnlySequence<byte>`, improving flexibility in sample feeding. - Added `MemoryExtensions` class for utility methods related to `ReadOnlySequence<byte>`. - Improved error handling and logging consistency. - Refactored code for better readability, maintainability, and alignment with modern C# practices, including standardized `using` statements and the use of `Span<byte>`.
1 parent 0a651fe commit 5645c18

18 files changed

Lines changed: 789 additions & 376 deletions

src/RTSPServerApp/RTSPServerWorker.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using SharpMp4;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.IO;
910
using System.Linq;
@@ -156,9 +157,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
156157

157158
try
158159
{
159-
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List<byte[]>)videoTrack[videoIndex++ % videoTrack.Count]);
160+
using (var buffer = new PooledByteBuffer(initialBufferSize: 0))
161+
{
162+
foreach (var trackBytes in videoTrack[videoIndex++ % videoTrack.Count])
163+
{
164+
buffer.Write(trackBytes);
165+
}
166+
167+
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), buffer.GetReadOnlySequence());
168+
}
160169
}
161-
catch(Exception ex)
170+
catch (Exception ex)
162171
{
163172
_logger.LogError(ex, $"FeedInRawSamples failed: {ex.Message}");
164173
}
@@ -167,7 +176,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
167176
{
168177
Reset(ref videoIndex, videoTimer, ref audioIndex, audioTimer);
169178
}
170-
};
179+
}
180+
;
171181
}
172182

173183
if (rtspAudioTrack != null)
@@ -177,7 +187,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
177187
audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate);
178188
audioTimer.Elapsed += (s, e) =>
179189
{
180-
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List<byte[]>() { audioTrack[0][audioIndex++ % audioTrack[0].Count] });
190+
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence<byte>(audioTrack[0][audioIndex++ % audioTrack[0].Count]));
181191

182192
if (audioIndex % audioTrack[0].Count == 0)
183193
{

src/RTSPServerFFmpeg/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken
130130
{
131131
byte[] rtp = udpClient.Receive(ref remoteEndPoint);
132132
uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp);
133-
track.FeedInRawSamples(rtpTimestamp, new List<byte[]>() { rtp });
133+
track.FeedInRawSamples(rtpTimestamp, new(rtp));
134134
}
135135
catch (Exception e)
136136
{

src/RTSPServerPcap/Program.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Configuration;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.Diagnostics;
910
using System.IO;
@@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet)
8788
{
8889
var ipHeader = ParseIPHeader(packet);
8990

90-
if(ipHeader.Protocol == 6) // TCP - RTSP
91+
if (ipHeader.Protocol == 6) // TCP - RTSP
9192
{
9293
var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength);
9394
Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}");
@@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
113114

114115
if (udp != null && data.Length > 1) // TODO
115116
{
116-
if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117+
if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117118
{
118119
long messageTime = seconds * 1000 + (microseconds / 1000);
119120
long realTime = (uint)_stopwatch.ElapsedMilliseconds;
@@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
132133
{
133134
Thread.Sleep(sleep);
134135
}
135-
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
136+
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
136137
}
137-
else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138+
else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138139
{
139140
if (lastAudioMessageTime == -1)
140141
lastAudioMessageTime = messageTime;
@@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
149150
Thread.Sleep(sleep);
150151
}
151152

152-
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
153+
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
153154
}
154155
}
155156
}
@@ -305,19 +306,19 @@ public bool Parse(string rtsp)
305306
{
306307
int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength);
307308
}
308-
else if(line.StartsWith(TRANSPORT)) // SETUP response
309+
else if (line.StartsWith(TRANSPORT)) // SETUP response
309310
{
310311
int[] clientPorts = null;
311312
int[] serverPorts = null;
312313
string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';');
313-
foreach(var s in split)
314+
foreach (var s in split)
314315
{
315316
string str = s.Trim();
316-
if(str.StartsWith(CLIENT_PORT))
317+
if (str.StartsWith(CLIENT_PORT))
317318
{
318319
clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray();
319320
}
320-
else if(str.StartsWith(SERVER_PORT))
321+
else if (str.StartsWith(SERVER_PORT))
321322
{
322323
serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray();
323324
}
@@ -337,7 +338,7 @@ public bool Parse(string rtsp)
337338
if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position))
338339
{
339340
return true;
340-
}
341+
}
341342
}
342343
}
343344
}
@@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor
364365

365366
public class TCPHeader
366367
{
367-
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368+
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368369
{
369370
SourcePort = sourcePort;
370371
DestinationPort = destinationPort;
@@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber,
391392
public class IPHeader
392393
{
393394
public IPHeader(
394-
int family,
395-
int version,
395+
int family,
396+
int version,
396397
int headerLength,
397-
byte differentiatedServicesField,
398-
ushort totalLength,
399-
ushort identification,
400-
byte flags,
401-
ushort fragmentOffset,
402-
byte ttl,
403-
byte protocol,
404-
ushort headerCheckSum,
405-
IPAddress sourceIP,
398+
byte differentiatedServicesField,
399+
ushort totalLength,
400+
ushort identification,
401+
byte flags,
402+
ushort fragmentOffset,
403+
byte ttl,
404+
byte protocol,
405+
ushort headerCheckSum,
406+
IPAddress sourceIP,
406407
IPAddress destintationIP)
407408
{
408409
Family = family;
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
5+
namespace SharpRTSPClient
6+
{
7+
/// <summary>
8+
/// A pooled buffer writer that implements <see cref="IBufferWriter{Byte}"/> using <see cref="ArrayPool{Byte}.Shared"/> for efficient writing of byte data and
9+
/// allows reading the written content through a <see cref="ReadOnlySequence{Byte}"/> using the <see cref="GetReadOnlySequence"/> method.
10+
/// </summary>
11+
public sealed class PooledByteBuffer : IBufferWriter<byte>, IDisposable
12+
{
13+
private const int DefaultBufferSize = 4096;
14+
15+
private readonly List<byte[]> _buffers = new List<byte[]>();
16+
private readonly ArrayPool<byte> _pool;
17+
private int _currentIndex;
18+
private int _currentOffset;
19+
private bool _disposed;
20+
21+
/// <summary>
22+
/// Initializes a new instance of the <see cref="PooledByteBuffer"/> class with an optional initial buffer size and array pool.
23+
/// </summary>
24+
/// <param name="initialBufferSize">The initial size of the buffer to rent from the pool. Defaults to 4096 bytes.</param>
25+
/// <param name="pool">The array pool to use. If null, <see cref="ArrayPool{Byte}.Shared"/> is used.</param>
26+
public PooledByteBuffer(int initialBufferSize = DefaultBufferSize, ArrayPool<byte> pool = null)
27+
{
28+
_pool = pool ?? ArrayPool<byte>.Shared;
29+
AddNewBuffer(initialBufferSize);
30+
}
31+
32+
/// <summary>
33+
/// Notifies the buffer writer that <paramref name="count"/> bytes were written.
34+
/// </summary>
35+
/// <param name="count">The number of bytes written.</param>
36+
/// <exception cref="ArgumentOutOfRangeException">Thrown if count is negative or exceeds the current buffer capacity.</exception>
37+
public void Advance(int count)
38+
{
39+
if (count < 0 || _currentOffset + count > _buffers[_currentIndex].Length)
40+
{
41+
throw new ArgumentOutOfRangeException(nameof(count));
42+
}
43+
44+
_currentOffset += count;
45+
}
46+
47+
/// <summary>
48+
/// Returns a <see cref="Memory{Byte}"/> buffer to write to, ensuring at least <paramref name="sizeHint"/> bytes are available.
49+
/// </summary>
50+
/// <param name="sizeHint">The minimum number of bytes required. May be 0.</param>
51+
/// <returns>A writable memory buffer.</returns>
52+
public Memory<byte> GetMemory(int sizeHint = 0)
53+
{
54+
EnsureCapacity(sizeHint);
55+
return _buffers[_currentIndex].AsMemory(_currentOffset);
56+
}
57+
58+
/// <summary>
59+
/// Returns a <see cref="Span{Byte}"/> buffer to write to, ensuring at least <paramref name="sizeHint"/> bytes are available.
60+
/// </summary>
61+
/// <param name="sizeHint">The minimum number of bytes required. May be 0.</param>
62+
/// <returns>A writable span buffer.</returns>
63+
public Span<byte> GetSpan(int sizeHint = 0)
64+
{
65+
EnsureCapacity(sizeHint);
66+
return _buffers[_currentIndex].AsSpan(_currentOffset);
67+
}
68+
69+
/// <summary>
70+
/// Returns a <see cref="ReadOnlySequence{Byte}"/> representing the written data across all buffers.
71+
/// </summary>
72+
/// <returns>A read-only sequence of bytes.</returns>
73+
public ReadOnlySequence<byte> GetReadOnlySequence()
74+
{
75+
SequenceSegment first = null;
76+
SequenceSegment last = null;
77+
78+
for (var i = 0; i < _buffers.Count; i++)
79+
{
80+
var buffer = _buffers[i];
81+
var length = (i == _currentIndex) ? _currentOffset : buffer.Length;
82+
83+
if (length == 0)
84+
{
85+
continue;
86+
}
87+
88+
var segment = new SequenceSegment(buffer.AsMemory(0, length));
89+
90+
if (first == null)
91+
{
92+
first = segment;
93+
}
94+
95+
if (last != null)
96+
{
97+
last.SetNext(segment);
98+
}
99+
100+
last = segment;
101+
}
102+
103+
if (first == null || last == null)
104+
{
105+
return ReadOnlySequence<byte>.Empty;
106+
}
107+
108+
return new ReadOnlySequence<byte>(first, 0, last, last.Memory.Length);
109+
}
110+
111+
/// <summary>
112+
/// Releases all buffers back to the pool and clears internal state.
113+
/// </summary>
114+
public void Dispose()
115+
{
116+
if (_disposed)
117+
{
118+
return;
119+
}
120+
121+
foreach (var buffer in _buffers)
122+
{
123+
_pool.Return(buffer);
124+
}
125+
126+
_buffers.Clear();
127+
_disposed = true;
128+
}
129+
130+
private void EnsureCapacity(int sizeHint)
131+
{
132+
if (_currentOffset + sizeHint > _buffers[_currentIndex].Length)
133+
{
134+
var newSize = Math.Max(sizeHint, DefaultBufferSize);
135+
AddNewBuffer(newSize);
136+
}
137+
}
138+
139+
private void AddNewBuffer(int size)
140+
{
141+
var buffer = _pool.Rent(size);
142+
_buffers.Add(buffer);
143+
_currentIndex = _buffers.Count - 1;
144+
_currentOffset = 0;
145+
}
146+
147+
private class SequenceSegment : ReadOnlySequenceSegment<byte>
148+
{
149+
public SequenceSegment(ReadOnlyMemory<byte> memory)
150+
{
151+
Memory = memory;
152+
}
153+
154+
public void SetNext(SequenceSegment next)
155+
{
156+
Next = next;
157+
next.RunningIndex = RunningIndex + Memory.Length;
158+
}
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)