11using Microsoft . Extensions . Options ;
22using System ;
3+ using System . IO ;
4+ using System . Linq ;
35using System . Net . WebSockets ;
46using System . Threading ;
57using System . Threading . Tasks ;
@@ -13,38 +15,82 @@ internal class ClientWebSocketConnector : IWebSocketConnector
1315 private readonly ConnectorOptions _options ;
1416 private readonly InvocatorRegistry _invocatorRegistry ;
1517
16- public ClientWebSocketConnector ( IOptions < ConnectorOptions > options , InvocatorRegistry invocatorRegistry )
18+ public ClientWebSocketConnector ( IOptions < ConnectorOptions > options ,
19+ InvocatorRegistry invocatorRegistry )
1720 {
1821 _options = options . Value ;
1922 _invocatorRegistry = invocatorRegistry ;
2023 }
2124
22- public async Task InitializeAsync ( )
25+ private async Task Receive ( )
2326 {
24- var uri = new Uri ( $ "ws://{ _options . WebSocketHostAddress } ") ;
25- _webSocket = new ClientWebSocket ( ) ;
26- await _webSocket . ConnectAsync ( uri , CancellationToken . None ) ;
27- var buffer = new byte [ 1024 * 4 ] ;
27+ var buffer = new byte [ SocketsConstants . ChunkSize ] ;
2828 var result = await _webSocket . ReceiveAsync ( new ArraySegment < byte > ( buffer ) , CancellationToken . None ) ;
2929 while ( ! result . CloseStatus . HasValue )
3030 {
31- var context = result . ToContext ( buffer ) ;
32- if ( context . Command == WebSocketCommands . Handshake )
33- _connectionId = context . Value . ToString ( ) ;
31+ if ( result . MessageType == WebSocketMessageType . Text )
32+ {
33+ var context = result . ToContext ( buffer ) ;
34+ if ( context . Command == WebSocketCommands . Handshake )
35+ _connectionId = context . Value . ToString ( ) ;
36+
37+ var _invocators = _invocatorRegistry . GetInvocators ( context ) ;
38+ _invocators . ForEach ( async x => await x . InvokeAsync ( context ) ) ;
39+ result = await _webSocket . ReceiveAsync ( new ArraySegment < byte > ( buffer ) , CancellationToken . None ) ;
40+ }
3441
35- var _invocators = _invocatorRegistry . GetInvocators ( context ) ;
36- _invocators . ForEach ( async x => await x . InvokeAsync ( context ) ) ;
37- result = await _webSocket . ReceiveAsync ( new ArraySegment < byte > ( buffer ) , CancellationToken . None ) ;
42+ if ( result . MessageType == WebSocketMessageType . Binary )
43+ {
44+ byte [ ] binaryResult = null ;
45+ using ( var ms = new MemoryStream ( ) )
46+ {
47+ while ( ! result . EndOfMessage )
48+ {
49+ if ( ! result . CloseStatus . HasValue )
50+ {
51+ await ms . WriteAsync ( buffer , 0 , result . Count ) ;
52+ }
53+ result = await _webSocket . ReceiveAsync ( new ArraySegment < byte > ( buffer ) , CancellationToken . None ) ;
54+ }
55+ if ( result . EndOfMessage )
56+ {
57+ if ( ! result . CloseStatus . HasValue )
58+ {
59+ await ms . WriteAsync ( buffer , 0 , result . Count ) ;
60+ }
61+ }
62+ binaryResult = ms . ToArray ( ) ;
63+ }
64+ var context = result . ToBinaryContext ( binaryResult ) ;
65+ var _invocators = _invocatorRegistry . GetInvocators ( context ) ;
66+ _invocators . ForEach ( async x => await x . InvokeAsync ( context ) ) ;
67+ result = await _webSocket . ReceiveAsync ( new ArraySegment < byte > ( buffer ) , CancellationToken . None ) ;
68+ }
3869 }
3970 await _webSocket . CloseAsync ( result . CloseStatus . Value , result . CloseStatusDescription , CancellationToken . None ) ;
4071 }
4172
73+ public async Task ConnectAsync ( )
74+ {
75+ var uri = new Uri ( $ "ws://{ _options . WebSocketHostAddress } ") ;
76+ _webSocket = new ClientWebSocket ( ) ;
77+ await _webSocket . ConnectAsync ( uri , CancellationToken . None ) ;
78+ await Task . WhenAll ( Receive ( ) ) ;
79+ }
80+
4281 public async Task SendAsync ( WebSocketMessageContext context )
4382 {
4483 var segments = context . ToSegment ( ) ;
4584 await _webSocket . SendAsync ( segments , WebSocketMessageType . Text , true , CancellationToken . None ) ;
4685 }
4786
87+ public async Task SendBinaryAsync ( byte [ ] bytes )
88+ {
89+ // TODO Chunked
90+ var segments = new ArraySegment < byte > ( bytes , 0 , bytes . Count ( ) ) ;
91+ await _webSocket . SendAsync ( segments , WebSocketMessageType . Binary , true , CancellationToken . None ) ;
92+ }
93+
4894 internal void Close ( string statusDescription )
4995 {
5096 _webSocket . CloseAsync ( WebSocketCloseStatus . NormalClosure , statusDescription , CancellationToken . None ) ;
0 commit comments