Skip to content

Commit 3b3c63d

Browse files
author
Paulo Morgado
committed
Enhance memory management and reliability in RTSP server
- Refactored audio and video track handling to utilize `ReadOnlySequence<byte>` for better performance and memory efficiency. - Introduced `AdjustedSizeMemoryOwner` for effective memory allocation. - Updated method signatures for RTP packet creation and sample feeding to accept `ReadOnlySequence<byte>`, ensuring consistency and optimizing memory usage. - Modified `RTPPacketUtil` to read timestamps from `ReadOnlySpan<byte>`, enhancing performance. - Overall code structure refined for improved readability and memory management.
1 parent 75ff42b commit 3b3c63d

14 files changed

Lines changed: 172 additions & 106 deletions

File tree

src/RTSPClientApp/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
using (RTSPClient client = new RTSPClient())
1111
{
12-
client.NewVideoStream += (sender, e) => Console.WriteLine(e.ToString());
13-
client.ReceivedVideoData += (sender, e) => Console.Write("*");
12+
client.NewVideoStream += (sender, e) => Console.WriteLine($"New video stream: PayloadType={e.PayloadType}, StreamType={e.StreamType}");
13+
client.ReceivedVideoData += (sender, e) => Console.WriteLine($"Received video data: Timestamp={e.Timestamp}");
1414
client.NewAudioStream += (sender, e) => Console.WriteLine(e.ToString());
1515
client.ReceivedAudioData += (sender, e) => Console.Write("+");
1616

src/RTSPServerApp/Program.cs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using SharpMp4;
33
using SharpRTSPServer;
44
using System;
5+
using System.Buffers;
56
using System.Collections.Generic;
67
using System.IO;
78
using System.Linq;
@@ -51,7 +52,7 @@
5152
else
5253
{
5354
var h265VisualSample = videoTrackBox.GetMdia().GetMinf().GetStbl().GetStsd().Children.FirstOrDefault(x => x.Type == VisualSampleEntryBox.TYPE6 || x.Type == VisualSampleEntryBox.TYPE7) as VisualSampleEntryBox;
54-
if(h265VisualSample != null)
55+
if (h265VisualSample != null)
5556
{
5657
rtspVideoTrack = new SharpRTSPServer.H265Track();
5758
server.AddVideoTrack(rtspVideoTrack);
@@ -60,9 +61,9 @@
6061
{
6162
throw new NotSupportedException("No supported video found!");
6263
}
63-
}
64+
}
6465
}
65-
66+
6667
if (audioTrackBox != null)
6768
{
6869
audioTrackId = fmp4.FindAudioTrackID().First();
@@ -76,7 +77,7 @@
7677
server.AddAudioTrack(rtspAudioTrack);
7778
}
7879
else
79-
{
80+
{
8081
// unsupported audio
8182
}
8283
}
@@ -92,7 +93,7 @@
9293
{
9394
var videoSamplingRate = SharpRTSPServer.H264Track.DEFAULT_CLOCK;
9495
var videoSampleDuration = videoSamplingRate / videoFrameRate;
95-
var videoTrack = parsedMDAT[videoTrackId];
96+
var videoTrack = parsedMDAT[videoTrackId];
9697
videoTimer = new Timer(videoSampleDuration * 1000 / videoSamplingRate);
9798
videoTimer.Elapsed += (s, e) =>
9899
{
@@ -109,7 +110,7 @@
109110
videoIndex++;
110111
}
111112

112-
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), (List<byte[]>)videoTrack[videoIndex++ % videoTrack.Count]);
113+
rtspVideoTrack.FeedInRawSamples((uint)(videoIndex * videoSampleDuration), CreateReadOnlySequence(videoTrack[videoIndex++ % videoTrack.Count]));
113114

114115
if (videoIndex % videoTrack.Count == 0)
115116
{
@@ -125,7 +126,7 @@
125126
audioTimer = new Timer(audioSampleDuration * 1000 / (rtspAudioTrack as SharpRTSPServer.AACTrack).SamplingRate);
126127
audioTimer.Elapsed += (s, e) =>
127128
{
128-
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new List<byte[]>() { audioTrack[0][audioIndex++ % audioTrack[0].Count] });
129+
rtspAudioTrack.FeedInRawSamples((uint)(audioIndex * audioSampleDuration), new ReadOnlySequence<byte>(audioTrack[0][audioIndex++ % audioTrack[0].Count]));
129130

130131
if (audioIndex % audioTrack[0].Count == 0)
131132
{
@@ -151,7 +152,7 @@
151152
Console.WriteLine("Press any key to exit");
152153
while (!Console.KeyAvailable)
153154
{
154-
System.Threading.Thread.Sleep(250);
155+
System.Threading.Thread.Sleep(250);
155156
}
156157
}
157158

@@ -164,3 +165,42 @@ static void Reset(ref int videoIndex, Timer videoTimer, ref int audioIndex, Time
164165
videoTimer?.Start();
165166
audioTimer?.Start();
166167
}
168+
169+
170+
static ReadOnlySequence<byte> CreateReadOnlySequence(IList<byte[]> byteArrayList)
171+
{
172+
if (byteArrayList.Count == 0)
173+
{
174+
return ReadOnlySequence<byte>.Empty;
175+
}
176+
177+
var firstSegment = new ReadOnlyMemory<byte>(byteArrayList[0]);
178+
var lastSegment = firstSegment;
179+
180+
SequenceSegment startSegment = new SequenceSegment(firstSegment);
181+
SequenceSegment currentSegment = startSegment;
182+
183+
for (int i = 1; i < byteArrayList.Count; i++)
184+
{
185+
var nextSegment = new SequenceSegment(new ReadOnlyMemory<byte>(byteArrayList[i]));
186+
currentSegment.SetNext(nextSegment);
187+
currentSegment = nextSegment;
188+
lastSegment = nextSegment.Memory;
189+
}
190+
191+
return new ReadOnlySequence<byte>(startSegment, 0, currentSegment, currentSegment.Memory.Length);
192+
}
193+
194+
public class SequenceSegment : ReadOnlySequenceSegment<byte>
195+
{
196+
public SequenceSegment(ReadOnlyMemory<byte> memory)
197+
{
198+
Memory = memory;
199+
}
200+
201+
public void SetNext(SequenceSegment next)
202+
{
203+
Next = next;
204+
RunningIndex += Memory.Length;
205+
}
206+
}

src/RTSPServerFFmpeg/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Task RunUdpClient(ProxyTrack track, Uri uri, CancellationToken cancellationToken
130130
{
131131
byte[] rtp = udpClient.Receive(ref remoteEndPoint);
132132
uint rtpTimestamp = RTPPacketUtil.ReadTS(rtp);
133-
track.FeedInRawSamples(rtpTimestamp, new List<byte[]>() { rtp });
133+
track.FeedInRawSamples(rtpTimestamp, new(rtp));
134134
}
135135
catch (Exception e)
136136
{

src/RTSPServerPcap/Program.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Configuration;
55
using SharpRTSPServer;
66
using System;
7+
using System.Buffers;
78
using System.Collections.Generic;
89
using System.Diagnostics;
910
using System.IO;
@@ -87,7 +88,7 @@ void Reader_OnReadPacketEvent(object context, IPacket packet)
8788
{
8889
var ipHeader = ParseIPHeader(packet);
8990

90-
if(ipHeader.Protocol == 6) // TCP - RTSP
91+
if (ipHeader.Protocol == 6) // TCP - RTSP
9192
{
9293
var tcpHeader = ParseTCPHeader(packet, 4 + ipHeader.HeaderLength);
9394
Debug.WriteLine($"Source: {ipHeader.SourceIP}:{tcpHeader.SourcePort}, Dest: {ipHeader.DestintationIP}:{tcpHeader.DestinationPort}, Ver: {ipHeader.Version}");
@@ -113,7 +114,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
113114

114115
if (udp != null && data.Length > 1) // TODO
115116
{
116-
if(data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117+
if (data[0] == 0x80 && data[1] != 0xc8) // 0xc8 sender report -> ignore rtcp
117118
{
118119
long messageTime = seconds * 1000 + (microseconds / 1000);
119120
long realTime = (uint)_stopwatch.ElapsedMilliseconds;
@@ -132,9 +133,9 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
132133
{
133134
Thread.Sleep(sleep);
134135
}
135-
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
136+
videoTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
136137
}
137-
else if(rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138+
else if (rtspProtocolParser.Ports.Count > 1 && rtspProtocolParser.Ports[1].Contains(udp.SourcePort) && rtspProtocolParser.Ports[1].Contains(udp.DestinationPort))
138139
{
139140
if (lastAudioMessageTime == -1)
140141
lastAudioMessageTime = messageTime;
@@ -149,7 +150,7 @@ void ParseData(byte[] data, object header, uint seconds, uint microseconds)
149150
Thread.Sleep(sleep);
150151
}
151152

152-
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new List<byte[]> { data });
153+
audioTrack?.FeedInRawSamples(RTPPacketUtil.ReadTS(data), new ReadOnlySequence<byte>(data));
153154
}
154155
}
155156
}
@@ -305,19 +306,19 @@ public bool Parse(string rtsp)
305306
{
306307
int.TryParse(line.Substring(CONTENT_LENGTH.Length).Trim(), out contentLength);
307308
}
308-
else if(line.StartsWith(TRANSPORT)) // SETUP response
309+
else if (line.StartsWith(TRANSPORT)) // SETUP response
309310
{
310311
int[] clientPorts = null;
311312
int[] serverPorts = null;
312313
string[] split = line.Substring(TRANSPORT.Length).Trim().Split(';');
313-
foreach(var s in split)
314+
foreach (var s in split)
314315
{
315316
string str = s.Trim();
316-
if(str.StartsWith(CLIENT_PORT))
317+
if (str.StartsWith(CLIENT_PORT))
317318
{
318319
clientPorts = str.Substring(CLIENT_PORT.Length).Split('-').Select(int.Parse).ToArray();
319320
}
320-
else if(str.StartsWith(SERVER_PORT))
321+
else if (str.StartsWith(SERVER_PORT))
321322
{
322323
serverPorts = str.Substring(SERVER_PORT.Length).Split('-').Select(int.Parse).ToArray();
323324
}
@@ -337,7 +338,7 @@ public bool Parse(string rtsp)
337338
if (ms.Position == ms.Length && contentLength > (ms.Length - ms.Position))
338339
{
339340
return true;
340-
}
341+
}
341342
}
342343
}
343344
}
@@ -364,7 +365,7 @@ public UDPHeader(ushort sourcePort, ushort destinationPort, ushort length, ushor
364365

365366
public class TCPHeader
366367
{
367-
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368+
public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber, uint acknowledgementNumber, int tcpHeaderLength, int flags, ushort window, ushort checksum, ushort urgentPointer)
368369
{
369370
SourcePort = sourcePort;
370371
DestinationPort = destinationPort;
@@ -391,18 +392,18 @@ public TCPHeader(ushort sourcePort, ushort destinationPort, uint sequenceNumber,
391392
public class IPHeader
392393
{
393394
public IPHeader(
394-
int family,
395-
int version,
395+
int family,
396+
int version,
396397
int headerLength,
397-
byte differentiatedServicesField,
398-
ushort totalLength,
399-
ushort identification,
400-
byte flags,
401-
ushort fragmentOffset,
402-
byte ttl,
403-
byte protocol,
404-
ushort headerCheckSum,
405-
IPAddress sourceIP,
398+
byte differentiatedServicesField,
399+
ushort totalLength,
400+
ushort identification,
401+
byte flags,
402+
ushort fragmentOffset,
403+
byte ttl,
404+
byte protocol,
405+
ushort headerCheckSum,
406+
IPAddress sourceIP,
406407
IPAddress destintationIP)
407408
{
408409
Family = family;

src/SharpRTSPServer/AACTrack.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,20 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
117117
/// <param name="samples">An array of AAC fragments. By default single fragment is expected.</param>
118118
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
119119
/// <returns>RTP packets.</returns>
120-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
120+
public override List<IMemoryOwner<byte>> CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
121121
{
122122
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
123123
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
124124

125-
for (int i = 0; i < samples.Count; i++)
125+
foreach (var sample in samples)
126126
{
127127
// append AU header (required for AAC)
128-
var audioPacket = AppendAUHeader(samples[i]);
128+
var audioPacket = AppendAUHeader(sample.Span);
129129

130130
// Put the whole Audio Packet into one RTP packet.
131131
// 12 is header size when there are no CSRCs or extensions
132132
var size = 12 + audioPacket.Length;
133-
var owner = MemoryPool<byte>.Shared.Rent(size);
133+
var owner = AdjustedSizeMemoryOwner.Rent(size);
134134
memoryOwners.Add(owner);
135135

136136
var rtpPacket = owner.Memory.Slice(0, size);
@@ -152,18 +152,19 @@ public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(
152152
rtpPackets.Add(rtpPacket);
153153
}
154154

155-
return (rtpPackets, memoryOwners);
155+
return memoryOwners;
156156
}
157157

158-
private static byte[] AppendAUHeader(byte[] frame)
158+
private static byte[] AppendAUHeader(ReadOnlySpan<byte> frame)
159159
{
160160
short frameLen = (short)(frame.Length << 3);
161-
byte[] header = new byte[4];
161+
byte[] header = new byte[4 + frame.Length];
162162
header[0] = 0x00;
163163
header[1] = 0x10; // 16 bits size of the header
164164
header[2] = (byte)((frameLen >> 8) & 0xFF);
165165
header[3] = (byte)(frameLen & 0xFF);
166-
return header.Concat(frame).ToArray();
166+
frame.CopyTo(header.AsSpan(4));
167+
return header;
167168
}
168169

169170
private static int GetAACLevel(int samplingFrequency, int channelConfiguration)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System;
2+
using System.Buffers;
3+
4+
namespace SharpRTSPServer
5+
{
6+
internal class AdjustedSizeMemoryOwner : IMemoryOwner<byte>
7+
{
8+
private readonly IMemoryOwner<byte> _wrapped;
9+
10+
private AdjustedSizeMemoryOwner(IMemoryOwner<byte> wrapped, int size)
11+
{
12+
_wrapped = wrapped;
13+
Memory = _wrapped.Memory.Slice(0, size);
14+
}
15+
16+
public Memory<byte> Memory { get; }
17+
18+
public void Dispose() => _wrapped.Dispose();
19+
20+
public static IMemoryOwner<byte> Rent(int size) => new AdjustedSizeMemoryOwner(MemoryPool<byte>.Shared.Rent(size), size);
21+
}
22+
}

src/SharpRTSPServer/G711Track.cs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,15 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
5959
/// <param name="samples">An array of PCMU fragments. By default single fragment is expected.</param>
6060
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
6161
/// <returns>RTP packets.</returns>
62-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
62+
public override List<IMemoryOwner<byte>> CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
6363
{
6464
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
6565
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
6666

67-
for (int i = 0; i < samples.Count; i++)
67+
foreach (var audioPacket in samples)
6868
{
69-
var audioPacket = samples[i];
7069
var size = 12 + audioPacket.Length;
71-
var owner = MemoryPool<byte>.Shared.Rent(size);
70+
var owner = AdjustedSizeMemoryOwner.Rent(size);
7271
memoryOwners.Add(owner);
7372

7473
var rtpPacket = owner.Memory.Slice(0, size);
@@ -86,7 +85,7 @@ public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(
8685
rtpPackets.Add(rtpPacket);
8786
}
8887

89-
return (rtpPackets, memoryOwners);
88+
return memoryOwners;
9089
}
9190
}
9291

@@ -144,19 +143,17 @@ public override StringBuilder BuildSDP(StringBuilder sdp)
144143
/// <param name="samples">An array of PCMA fragments. By default single fragment is expected.</param>
145144
/// <param name="rtpTimestamp">RTP timestamp in the timescale of the track.</param>
146145
/// <returns>RTP packets.</returns>
147-
public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(List<byte[]> samples, uint rtpTimestamp)
146+
public override List<IMemoryOwner<byte>> CreateRtpPackets(ReadOnlySequence<byte> samples, uint rtpTimestamp)
148147
{
149-
List<Memory<byte>> rtpPackets = new List<Memory<byte>>();
150148
List<IMemoryOwner<byte>> memoryOwners = new List<IMemoryOwner<byte>>();
151149

152-
for (int i = 0; i < samples.Count; i++)
150+
foreach (var audioPacket in samples)
153151
{
154-
var audioPacket = samples[i];
155152
var size = 12 + audioPacket.Length;
156-
var owner = MemoryPool<byte>.Shared.Rent(size);
153+
var owner = AdjustedSizeMemoryOwner.Rent(size);
157154
memoryOwners.Add(owner);
158155

159-
var rtpPacket = owner.Memory.Slice(0, size);
156+
var rtpPacket = owner.Memory;
160157

161158
const bool rtpPadding = false;
162159
const bool rtpHasExtension = false;
@@ -168,10 +165,9 @@ public override (List<Memory<byte>>, List<IMemoryOwner<byte>>) CreateRtpPackets(
168165

169166
RTPPacketUtil.WriteTS(rtpPacket.Span, rtpTimestamp);
170167
audioPacket.CopyTo(rtpPacket.Slice(12));
171-
rtpPackets.Add(rtpPacket);
172168
}
173169

174-
return (rtpPackets, memoryOwners);
170+
return memoryOwners;
175171
}
176172
}
177173
}

0 commit comments

Comments
 (0)