3434import java .io .IOException ;
3535
3636import javax .annotation .Nonnull ;
37+ import javax .annotation .Nullable ;
3738
3839import okio .Buffer ;
3940import okio .BufferedSource ;
@@ -84,61 +85,74 @@ public void onOpen(WebSocket webSocket, Response response) {
8485 final WebSocket notifyConnected ;
8586 synchronized (lock ) {
8687 if (requestClose ) {
88+ notifyConnected = null ;
8789 try {
88- webSocketItem .close (0 , "Just disconnect" );
90+ webSocket .close (0 , "Just disconnect" );
8991 } catch (IOException e ) {
9092 subscriber .onNext (new RxEventDisconnected (e ));
9193 }
92- webSocketItem = null ;
9394 } else {
94- webSocketItem = webSocket ;
95+ notifyConnected = webSocket ;
9596 }
96- notifyConnected = webSocketItem ;
97+ webSocketItem = notifyConnected ;
9798 }
9899 if (notifyConnected != null ) {
99100 subscriber .onNext (new RxEventConnected (notifyConnected ));
100101 }
101102 }
102103
103- @ Nonnull
104- WebSocket webSocketOrThrow () {
104+ @ Nullable
105+ WebSocket webSocketOrNull () {
105106 synchronized (lock ) {
106- if (webSocketItem == null ) {
107- throw new IllegalArgumentException ("Web socket should not be null" );
108- }
109107 return webSocketItem ;
110108 }
111109 }
112110
113111 @ Override
114112 public void onFailure (IOException e , Response response ) {
115113 subscriber .onNext (new RxEventDisconnected (e ));
114+ subscriber .onError (e );
115+ synchronized (lock ) {
116+ webSocketItem = null ;
117+ requestClose = false ;
118+ }
116119 }
117120
118121 @ Override
119122 public void onMessage (BufferedSource payload , WebSocket .PayloadType type ) throws IOException {
120123 try {
124+ final WebSocket sender = webSocketOrNull ();
125+ if (sender == null ) {
126+ return ;
127+ }
121128 if (WebSocket .PayloadType .BINARY .equals (type )) {
122- subscriber .onNext (new RxEventBinaryMessage (webSocketOrThrow () , payload .readByteArray ()));
129+ subscriber .onNext (new RxEventBinaryMessage (sender , payload .readByteArray ()));
123130 } else if (WebSocket .PayloadType .TEXT .equals (type )) {
124- subscriber .onNext (new RxEventStringMessage (webSocketOrThrow () , payload .readUtf8 ()));
131+ subscriber .onNext (new RxEventStringMessage (sender , payload .readUtf8 ()));
125132 }
126133 } finally {
127134 payload .close ();
128135 }
129136 }
130137
131138 @ Override
132- public void onPong (Buffer payload ) {
133- subscriber .onNext (new RxEventPong (webSocketOrThrow (), payload .readByteArray ()));
139+ public void onPong (Buffer payload ) {final WebSocket sender = webSocketOrNull ();
140+ if (sender == null ) {
141+ return ;
142+ }
143+ subscriber .onNext (new RxEventPong (sender , payload .readByteArray ()));
134144
135145 }
136146
137147 @ Override
138148 public void onClose (int code , String reason ) {
139- subscriber .onNext (new RxEventServerRequestedClose (webSocketOrThrow (), code , reason ));
149+ final WebSocket sender = webSocketOrNull ();
150+ if (sender != null ) {
151+ subscriber .onNext (new RxEventServerRequestedClose (sender , code , reason ));
152+ }
140153 synchronized (lock ) {
141154 webSocketItem = null ;
155+ requestClose = false ;
142156 }
143157 }
144158
@@ -153,6 +167,7 @@ public void call() {
153167 webSocketItem .close (0 , "Just disconnect" );
154168 } catch (IOException e ) {
155169 subscriber .onNext (new RxEventDisconnected (e ));
170+ subscriber .onError (e );
156171 }
157172 webSocketItem = null ;
158173 } else {
0 commit comments