11using Microsoft . Extensions . Logging ;
2- using Microsoft . Extensions . Options ;
32using NetCoreStack . WebSockets . Interfaces ;
43using NetCoreStack . WebSockets . Internal ;
54using Newtonsoft . Json ;
1312using System . Text ;
1413using System . Threading ;
1514using System . Threading . Tasks ;
16- using Microsoft . Extensions . DependencyInjection ;
1715using static NetCoreStack . WebSockets . Internal . SocketsConstants ;
1816
1917namespace NetCoreStack . WebSockets
2018{
2119 public class ConnectionManager : IConnectionManager
2220 {
23- private readonly SemaphoreSlim _sendFrameAsyncLock = new SemaphoreSlim ( 1 , 1 ) ;
2421 private readonly IServiceProvider _serviceProvider ;
2522 private readonly IHandshakeStateTransport _initState ;
2623 private readonly ILoggerFactory _loggerFactory ;
2724 private readonly IStreamCompressor _compressor ;
28- private readonly TransportLifetimeManager _lifetimeManager ;
2925
3026 public ConcurrentDictionary < string , WebSocketTransport > Connections { get ; }
3127
3228 public ConnectionManager ( IServiceProvider serviceProvider ,
3329 IStreamCompressor compressor ,
34- TransportLifetimeManager lifetimeManager ,
3530 IHandshakeStateTransport initState ,
3631 ILoggerFactory loggerFactory )
3732 {
3833 _serviceProvider = serviceProvider ;
3934 _compressor = compressor ;
40- _lifetimeManager = lifetimeManager ;
4135 _initState = initState ;
4236 _loggerFactory = loggerFactory ;
4337 Connections = new ConcurrentDictionary < string , WebSocketTransport > ( StringComparer . OrdinalIgnoreCase ) ;
@@ -51,32 +45,31 @@ private async Task<byte[]> PrepareFramesBytesAsync(byte[] body, IDictionary<stri
5145 }
5246
5347 if ( properties == null )
48+ {
5449 properties = new Dictionary < string , object > ( ) ;
50+ }
5551
5652 bool compressed = GZipHelper . IsGZipBody ( body ) ;
5753
5854 object key = null ;
5955 if ( properties . TryGetValue ( CompressedKey , out key ) )
56+ {
6057 properties [ CompressedKey ] = compressed ;
58+ }
6159 else
60+ {
6261 properties . Add ( CompressedKey , compressed ) ;
62+ }
6363
6464 string props = JsonConvert . SerializeObject ( properties ) ;
6565 byte [ ] header = Encoding . UTF8 . GetBytes ( $ "{ props } ") ;
6666
67- #if DEBUG
68- if ( properties . TryGetValue ( "Key" , out key ) )
69- {
70- int length = body . Length ;
71- Debug . WriteLine ( $ "=====Key: { key ? . ToString ( ) } =====Length: { length } =====") ;
72- }
73- #endif
74-
7567 if ( ! compressed )
68+ {
7669 body = await _compressor . CompressAsync ( body ) ;
70+ }
7771
7872 body = header . Concat ( Splitter ) . Concat ( body ) . ToArray ( ) ;
79-
8073 return body ;
8174 }
8275
@@ -97,19 +90,10 @@ private Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor
9790 return transport . WebSocket . SendAsync ( descriptor . Segments ,
9891 descriptor . MessageType ,
9992 descriptor . EndOfMessage ,
100- CancellationToken . None ) ;
93+ descriptor . CancellationToken ) ;
10194 }
102- else
103- {
104- // Only text messages
105- _lifetimeManager . AddQueue ( transport . ConnectionId , new MessageHolder
106- {
107- Segments = descriptor . Segments ,
108- KeepTime = DateTime . Now . AddMinutes ( 3 )
109- } ) ;
11095
111- return TaskCache . CompletedTask ;
112- }
96+ return Task . CompletedTask ;
11397 }
11498
11599 private Task SendBinaryAsync ( WebSocketTransport transport , byte [ ] chunkedBytes , bool endOfMessage )
@@ -129,7 +113,7 @@ private Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes,
129113 CancellationToken . None ) ;
130114 }
131115
132- return TaskCache . CompletedTask ;
116+ return Task . CompletedTask ;
133117 }
134118
135119 private async Task SendConcurrentBinaryAsync ( byte [ ] bytes )
@@ -174,17 +158,6 @@ public async Task ConnectAsync(WebSocket webSocket, string connectionId, string
174158 if ( Connections . TryGetValue ( connectionId , out transport ) )
175159 {
176160 transport . ReConnect ( webSocket ) ;
177- List < MessageHolder > messages = _lifetimeManager . TryDequeue ( connectionId ) ;
178- foreach ( var message in messages )
179- {
180- await SendAsync ( transport , new WebSocketMessageDescriptor
181- {
182- MessageType = WebSocketMessageType . Text ,
183- Segments = message . Segments ,
184- EndOfMessage = true ,
185- IsQueue = true ,
186- } ) ;
187- }
188161 }
189162 else
190163 {
@@ -226,13 +199,11 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
226199 EndOfMessage = true ,
227200 MessageType = WebSocketMessageType . Text
228201 } ;
229-
230- _sendFrameAsyncLock . Wait ( ) ;
202+
231203 foreach ( var connection in Connections )
232204 {
233205 await SendAsync ( connection . Value , descriptor ) ;
234206 }
235- _sendFrameAsyncLock . Release ( ) ;
236207 }
237208
238209 public async Task BroadcastBinaryAsync ( byte [ ] inputs , IDictionary < string , object > properties )
@@ -242,21 +213,18 @@ public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object
242213 return ;
243214 }
244215
245- _sendFrameAsyncLock . Wait ( ) ;
246216 var bytes = await PrepareFramesBytesAsync ( inputs , properties ) ;
247217 await SendConcurrentBinaryAsync ( bytes ) ;
248- _sendFrameAsyncLock . Release ( ) ;
249218 }
250219
251220 public Task SendAsync ( string connectionId , WebSocketMessageContext context )
252221 {
253222 if ( ! Connections . Any ( ) )
254223 {
255- return TaskCache . CompletedTask ;
224+ return Task . CompletedTask ;
256225 }
257226
258- WebSocketTransport transport = null ;
259- if ( ! Connections . TryGetValue ( connectionId , out transport ) )
227+ if ( ! Connections . TryGetValue ( connectionId , out WebSocketTransport transport ) )
260228 {
261229 throw new ArgumentOutOfRangeException ( nameof ( transport ) ) ;
262230 }
@@ -279,8 +247,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary
279247 return ;
280248 }
281249
282- WebSocketTransport transport = null ;
283- if ( ! Connections . TryGetValue ( connectionId , out transport ) )
250+ if ( ! Connections . TryGetValue ( connectionId , out WebSocketTransport transport ) )
284251 {
285252 throw new ArgumentOutOfRangeException ( nameof ( transport ) ) ;
286253 }
0 commit comments