|
25 | 25 | import com.appunite.websocket.rx.messages.RxEventStringMessage; |
26 | 26 | import okhttp3.OkHttpClient; |
27 | 27 | import okhttp3.Request; |
| 28 | +import okhttp3.RequestBody; |
28 | 29 | import okhttp3.Response; |
29 | 30 | import okhttp3.ResponseBody; |
30 | 31 | import okhttp3.ws.WebSocket; |
@@ -62,7 +63,6 @@ public RxWebSockets(@Nonnull OkHttpClient client, @Nonnull Request request) { |
62 | 63 | this.request = request; |
63 | 64 | } |
64 | 65 |
|
65 | | - |
66 | 66 | /** |
67 | 67 | * Returns observable that connected to a websocket and returns {@link RxObjectEvent}'s |
68 | 68 | * |
@@ -91,7 +91,7 @@ public void onOpen(WebSocket webSocket, Response response) { |
91 | 91 | subscriber.onNext(new RxEventDisconnected(e)); |
92 | 92 | } |
93 | 93 | } else { |
94 | | - notifyConnected = webSocket; |
| 94 | + notifyConnected = new LockingWebSocket(webSocket); |
95 | 95 | } |
96 | 96 | webSocketItem = notifyConnected; |
97 | 97 | } |
@@ -178,4 +178,35 @@ public void call() { |
178 | 178 | } |
179 | 179 | }); |
180 | 180 | } |
| 181 | + |
| 182 | + /** |
| 183 | + * Class that synchronizes writes to websocket |
| 184 | + */ |
| 185 | + private static class LockingWebSocket implements WebSocket { |
| 186 | + @Nonnull |
| 187 | + private final WebSocket webSocket; |
| 188 | + |
| 189 | + public LockingWebSocket(@Nonnull WebSocket webSocket) { |
| 190 | + this.webSocket = webSocket; |
| 191 | + } |
| 192 | + |
| 193 | + @Override |
| 194 | + public void sendMessage(RequestBody message) throws IOException { |
| 195 | + synchronized (this) { |
| 196 | + webSocket.sendMessage(message); |
| 197 | + } |
| 198 | + } |
| 199 | + |
| 200 | + @Override |
| 201 | + public void sendPing(Buffer payload) throws IOException { |
| 202 | + synchronized (this) { |
| 203 | + webSocket.sendPing(payload); |
| 204 | + } |
| 205 | + } |
| 206 | + |
| 207 | + @Override |
| 208 | + public void close(int code, String reason) throws IOException { |
| 209 | + webSocket.close(code, reason); |
| 210 | + } |
| 211 | + } |
181 | 212 | } |
0 commit comments