Skip to content

Commit 7bdd55b

Browse files
author
Paulo Morgado
committed
Refactor RTP handling to use ReadOnlySequence<byte> for improved memory management and performance. Introduced BufferSegment and PooledByteBuffer classes for efficient memory management. Updated methods across various classes, including RTSPServerWorker, ProxyTrack, and track classes, to accept ReadOnlySequence<byte> and return IByteBuffer. Enhanced the FeedInRawRTP method in RTSPServer and IRtpSender for better raw RTP data handling. Added MemoryExtensions for utility functions. Overall improvements to code structure for readability and maintainability.
1 parent 0a651fe commit 7bdd55b

18 files changed

Lines changed: 789 additions & 376 deletions

src/RTSPServerApp/RTSPServerWorker.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using SharpMp4;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.IO;
910
using System.Linq;
@@ -156,9 +157,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
156157

157158
try
158159
{
159-
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List<byte[]>)videoTrack[videoIndex++ % videoTrack.Count]);
160+
using (var buffer = new PooledByteBuffer(initialBufferSize: 0))
161+
{
162+
foreach (var trackBytes in videoTrack[videoIndex++ % videoTrack.Count])
163+
{
164+
buffer.Write(trackBytes);
165+
}
166+
167+
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), buffer.GetReadOnlySequence());
168+
}
160169
}
161-
catch(Exception ex)
170+
catch (Exception ex)
162171
{
163172
_logger.LogError(ex, $"FeedInRawSamples failed: {ex.Message}");
164173
}
@@ -167,7 +176,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
167176
{
168177
Reset(ref videoIndex, videoTimer, ref audioIndex, audioTimer);
169178
}
170-
};
179+
}
180+
;
171181
}
172182

173183
if (rtspAudioTrack != null)
@@ -177,7 +187,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
177187
audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate);
178188
audioTimer.Elapsed += (s, e) =>
179189
{
180-
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List<byte[]>() { audioTrack[0][audioIndex++ % audioTrack[0].Count] });
190+
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence<byte>(audioTrack[0][audioIndex++ % audioTrack[0].Count]));
181191

182192
if (audioIndex % audioTrack[0].Count == 0)
183193
{

src/RTSPServerFFmpeg/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken
130130
{
131131
byte[] rtp = udpClient.Receive(ref remoteEndPoint);
132132
uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp);
133-
track.FeedInRawSamples(rtpTimestamp, new List<byte[]>() { rtp });
133+
track.FeedInRawSamples(rtpTimestamp, new(rtp));
134134
}
135135
catch (Exception e)
136136
{

src/RTSPServerPcap/Program.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Configuration;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.Diagnostics;
910
using System.IO;
@@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet)
8788
{
8889
var ipHeader = ParseIPHeader(packet);
8990

90-
if(ipHeader.Protocol == 6) // TCP - RTSP
91+
if (ipHeader.Protocol == 6) // TCP - RTSP
9192
{
9293
var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength);
9394
Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}");
@@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
113114

114115
if (udp != null && data.Length > 1) // TODO
115116
{
116-
if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117+
if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117118
{
118119
long messageTime = seconds * 1000 + (microseconds / 1000);
119120
long realTime = (uint)_stopwatch.ElapsedMilliseconds;
@@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
132133
{
133134
Thread.Sleep(sleep);
134135
}
135-
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
136+
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
136137
}
137-
else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138+
else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138139
{
139140
if (lastAudioMessageTime == -1)
140141
lastAudioMessageTime = messageTime;
@@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
149150
Thread.Sleep(sleep);
150151
}
151152

152-
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
153+
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
153154
}
154155
}
155156
}
@@ -305,19 +306,19 @@ public bool Parse(string rtsp)
305306
{
306307
int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength);
307308
}
308-
else if(line.StartsWith(TRANSPORT)) // SETUP response
309+
else if (line.StartsWith(TRANSPORT)) // SETUP response
309310
{
310311
int[] clientPorts = null;
311312
int[] serverPorts = null;
312313
string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';');
313-
foreach(var s in split)
314+
foreach (var s in split)
314315
{
315316
string str = s.Trim();
316-
if(str.StartsWith(CLIENT_PORT))
317+
if (str.StartsWith(CLIENT_PORT))
317318
{
318319
clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray();
319320
}
320-
else if(str.StartsWith(SERVER_PORT))
321+
else if (str.StartsWith(SERVER_PORT))
321322
{
322323
serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray();
323324
}
@@ -337,7 +338,7 @@ public bool Parse(string rtsp)
337338
if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position))
338339
{
339340
return true;
340-
}
341+
}
341342
}
342343
}
343344
}
@@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor
364365

365366
public class TCPHeader
366367
{
367-
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368+
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368369
{
369370
SourcePort = sourcePort;
370371
DestinationPort = destinationPort;
@@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber,
391392
public class IPHeader
392393
{
393394
public IPHeader(
394-
int family,
395-
int version,
395+
int family,
396+
int version,
396397
int headerLength,
397-
byte differentiatedServicesField,
398-
ushort totalLength,
399-
ushort identification,
400-
byte flags,
401-
ushort fragmentOffset,
402-
byte ttl,
403-
byte protocol,
404-
ushort headerCheckSum,
405-
IPAddress sourceIP,
398+
byte differentiatedServicesField,
399+
ushort totalLength,
400+
ushort identification,
401+
byte flags,
402+
ushort fragmentOffset,
403+
byte ttl,
404+
byte protocol,
405+
ushort headerCheckSum,
406+
IPAddress sourceIP,
406407
IPAddress destintationIP)
407408
{
408409
Family = family;
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
5+
namespace SharpRTSPClient
6+
{
7+
/// <summary>
8+
/// A pooled buffer writer that implements <see cref="IBufferWriter{Byte}"/> using <see cref="ArrayPool{Byte}.Shared"/> for efficient writing of byte data and
9+
/// allows reading the written content through a <see cref="ReadOnlySequence{Byte}"/> using the <see cref="GetReadOnlySequence"/> method.
10+
/// </summary>
11+
public sealed class PooledByteBuffer : IBufferWriter<byte>, IDisposable
12+
{
13+
private const int DefaultBufferSize = 4096;
14+
15+
private readonly List<byte[]> _buffers = new List<byte[]>();
16+
private readonly ArrayPool<byte> _pool;
17+
private int _currentIndex;
18+
private int _currentOffset;
19+
private bool _disposed;
20+
21+
/// <summary>
22+
/// Initializes a new instance of the <see cref="PooledByteBuffer"/> class with an optional initial buffer size and array pool.
23+
/// </summary>
24+
/// <param name="initialBufferSize">The initial size of the buffer to rent from the pool. Defaults to 4096 bytes.</param>
25+
/// <param name="pool">The array pool to use. If null, <see cref="ArrayPool{Byte}.Shared"/> is used.</param>
26+
public PooledByteBuffer(int initialBufferSize = DefaultBufferSize, ArrayPool<byte> pool = null)
27+
{
28+
_pool = pool ?? ArrayPool<byte>.Shared;
29+
AddNewBuffer(initialBufferSize);
30+
}
31+
32+
/// <summary>
33+
/// Notifies the buffer writer that <paramref name="count"/> bytes were written.
34+
/// </summary>
35+
/// <param name="count">The number of bytes written.</param>
36+
/// <exception cref="ArgumentOutOfRangeException">Thrown if count is negative or exceeds the current buffer capacity.</exception>
37+
public void Advance(int count)
38+
{
39+
if (count < 0 || _currentOffset + count > _buffers[_currentIndex].Length)
40+
{
41+
throw new ArgumentOutOfRangeException(nameof(count));
42+
}
43+
44+
_currentOffset += count;
45+
}
46+
47+
/// <summary>
48+
/// Returns a <see cref="Memory{Byte}"/> buffer to write to, ensuring at least <paramref name="sizeHint"/> bytes are available.
49+
/// </summary>
50+
/// <param name="sizeHint">The minimum number of bytes required. May be 0.</param>
51+
/// <returns>A writable memory buffer.</returns>
52+
public Memory<byte> GetMemory(int sizeHint = 0)
53+
{
54+
EnsureCapacity(sizeHint);
55+
return _buffers[_currentIndex].AsMemory(_currentOffset);
56+
}
57+
58+
/// <summary>
59+
/// Returns a <see cref="Span{Byte}"/> buffer to write to, ensuring at least <paramref name="sizeHint"/> bytes are available.
60+
/// </summary>
61+
/// <param name="sizeHint">The minimum number of bytes required. May be 0.</param>
62+
/// <returns>A writable span buffer.</returns>
63+
public Span<byte> GetSpan(int sizeHint = 0)
64+
{
65+
EnsureCapacity(sizeHint);
66+
return _buffers[_currentIndex].AsSpan(_currentOffset);
67+
}
68+
69+
/// <summary>
70+
/// Returns a <see cref="ReadOnlySequence{Byte}"/> representing the written data across all buffers.
71+
/// </summary>
72+
/// <returns>A read-only sequence of bytes.</returns>
73+
public ReadOnlySequence<byte> GetReadOnlySequence()
74+
{
75+
SequenceSegment first = null;
76+
SequenceSegment last = null;
77+
78+
for (var i = 0; i < _buffers.Count; i++)
79+
{
80+
var buffer = _buffers[i];
81+
var length = (i == _currentIndex) ? _currentOffset : buffer.Length;
82+
83+
if (length == 0)
84+
{
85+
continue;
86+
}
87+
88+
var segment = new SequenceSegment(buffer.AsMemory(0, length));
89+
90+
if (first == null)
91+
{
92+
first = segment;
93+
}
94+
95+
if (last != null)
96+
{
97+
last.SetNext(segment);
98+
}
99+
100+
last = segment;
101+
}
102+
103+
if (first == null || last == null)
104+
{
105+
return ReadOnlySequence<byte>.Empty;
106+
}
107+
108+
return new ReadOnlySequence<byte>(first, 0, last, last.Memory.Length);
109+
}
110+
111+
/// <summary>
112+
/// Releases all buffers back to the pool and clears internal state.
113+
/// </summary>
114+
public void Dispose()
115+
{
116+
if (_disposed)
117+
{
118+
return;
119+
}
120+
121+
foreach (var buffer in _buffers)
122+
{
123+
_pool.Return(buffer);
124+
}
125+
126+
_buffers.Clear();
127+
_disposed = true;
128+
}
129+
130+
private void EnsureCapacity(int sizeHint)
131+
{
132+
if (_currentOffset + sizeHint > _buffers[_currentIndex].Length)
133+
{
134+
var newSize = Math.Max(sizeHint, DefaultBufferSize);
135+
AddNewBuffer(newSize);
136+
}
137+
}
138+
139+
private void AddNewBuffer(int size)
140+
{
141+
var buffer = _pool.Rent(size);
142+
_buffers.Add(buffer);
143+
_currentIndex = _buffers.Count - 1;
144+
_currentOffset = 0;
145+
}
146+
147+
private class SequenceSegment : ReadOnlySequenceSegment<byte>
148+
{
149+
public SequenceSegment(ReadOnlyMemory<byte> memory)
150+
{
151+
Memory = memory;
152+
}
153+
154+
public void SetNext(SequenceSegment next)
155+
{
156+
Next = next;
157+
next.RunningIndex = RunningIndex + Memory.Length;
158+
}
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)