Skip to content

Commit 9c8a6ce

Browse files
Merge pull request #13 from jacek-marchwicki/headers-and-sub-protocols
Ability to send subprotocols and own headers
2 parents 344d59e + 9ddc092 commit 9c8a6ce

12 files changed

Lines changed: 186 additions & 33 deletions

File tree

websockets-rxjava-example/src/main/java/com/appunite/socket/MainActivity.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import com.example.SocketConnectionImpl;
3232
import com.example.model.Message;
3333
import com.example.model.MessageType;
34+
import com.google.common.collect.ImmutableList;
3435
import com.google.gson.Gson;
3536
import com.google.gson.GsonBuilder;
3637

38+
import org.apache.http.Header;
39+
3740
import java.net.URI;
3841
import java.net.URISyntaxException;
3942

@@ -156,7 +159,10 @@ public RetentionFragment() {
156159
.create();
157160

158161
final NewWebSocket newWebSocket = new NewWebSocket();
159-
final RxWebSockets webSockets = new RxWebSockets(newWebSocket, ADDRESS);
162+
final RxWebSockets webSockets = new RxWebSockets(newWebSocket,
163+
ADDRESS,
164+
ImmutableList.of("chat"),
165+
ImmutableList.<Header>of());
160166
final RxJsonWebSockets jsonWebSockets = new RxJsonWebSockets(webSockets, gson, Message.class);
161167
final SocketConnection socketConnection = new SocketConnectionImpl(jsonWebSockets, Schedulers.io());
162168
presenter = new MainPresenter(new Socket(socketConnection, Schedulers.io()), Schedulers.io(), AndroidSchedulers.mainThread());

websockets-rxjava-example/src/test/java/com/example/RxJsonWebSocketsRealTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,23 @@
1717
package com.example;
1818

1919
import com.appunite.websocket.NewWebSocket;
20-
import com.appunite.websocket.rx.RxMoreObservables;
2120
import com.appunite.websocket.rx.RxWebSockets;
2221
import com.appunite.websocket.rx.json.RxJsonWebSockets;
23-
import com.appunite.websocket.rx.messages.RxEvent;
24-
import com.appunite.websocket.rx.messages.RxEventConnected;
2522
import com.example.model.Message;
2623
import com.example.model.MessageType;
24+
import com.google.common.collect.ImmutableList;
2725
import com.google.gson.Gson;
2826
import com.google.gson.GsonBuilder;
2927

28+
import org.apache.http.Header;
3029
import org.junit.Before;
3130
import org.junit.Ignore;
3231
import org.junit.Test;
3332

3433
import java.net.URI;
3534
import java.net.URISyntaxException;
3635

37-
import rx.Observable;
3836
import rx.Subscription;
39-
import rx.functions.Action1;
4037
import rx.schedulers.Schedulers;
4138

4239
public class RxJsonWebSocketsRealTest {
@@ -62,7 +59,11 @@ public void setUp() throws Exception {
6259
.create();
6360

6461
final NewWebSocket newWebSocket = new NewWebSocket();
65-
socket = new RxJsonWebSockets(new RxWebSockets(newWebSocket, SERVER_URI), gson, Message.class);;
62+
final RxWebSockets rxWebSockets = new RxWebSockets(newWebSocket,
63+
SERVER_URI,
64+
ImmutableList.of("chat"),
65+
ImmutableList.<Header>of());
66+
socket = new RxJsonWebSockets(rxWebSockets, gson, Message.class);;
6667
}
6768

6869
@Test

websockets-rxjava-example/src/test/java/com/example/RxWebSocketsRealTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818

1919
import com.appunite.websocket.NewWebSocket;
2020
import com.appunite.websocket.rx.RxWebSockets;
21+
import com.appunite.websocket.rx.messages.RxEvent;
22+
import com.google.common.collect.ImmutableList;
2123

24+
import org.apache.http.Header;
25+
import org.apache.http.message.BasicHeader;
2226
import org.junit.Before;
2327
import org.junit.Ignore;
2428
import org.junit.Test;
@@ -27,6 +31,7 @@
2731
import java.net.URISyntaxException;
2832

2933
import rx.Subscription;
34+
import rx.functions.Action1;
3035
import rx.schedulers.Schedulers;
3136

3237
public class RxWebSocketsRealTest {
@@ -47,7 +52,10 @@ public class RxWebSocketsRealTest {
4752
@Before
4853
public void setUp() throws Exception {
4954
final NewWebSocket newWebSocket = new NewWebSocket();
50-
socket = new RxWebSockets(newWebSocket, SERVER_URI);
55+
socket = new RxWebSockets(newWebSocket,
56+
SERVER_URI,
57+
ImmutableList.of("chat"),
58+
ImmutableList.<Header>of());
5159

5260
}
5361

@@ -56,6 +64,12 @@ public void setUp() throws Exception {
5664
public void testName() throws Exception {
5765
final Subscription subscribe = socket.webSocketObservable()
5866
.subscribeOn(Schedulers.io())
67+
.doOnNext(new Action1<RxEvent>() {
68+
@Override
69+
public void call(RxEvent rxEvent) {
70+
System.out.println("Event: " + rxEvent);
71+
}
72+
})
5973
.subscribe();
6074
Thread.sleep(5000);
6175
subscribe.unsubscribe();

websockets-rxjava-example/src/test/java/com/example/SocketRealTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import com.example.model.DataMessage;
2323
import com.example.model.MessageType;
2424
import com.example.model.Message;
25+
import com.google.common.collect.ImmutableList;
2526
import com.google.gson.Gson;
2627
import com.google.gson.GsonBuilder;
2728

29+
import org.apache.http.Header;
2830
import org.junit.Before;
2931
import org.junit.Ignore;
3032
import org.junit.Test;
@@ -61,7 +63,10 @@ public void setUp() throws Exception {
6163
.create();
6264

6365
final NewWebSocket newWebSocket = new NewWebSocket();
64-
final RxWebSockets webSockets = new RxWebSockets(newWebSocket, SERVER_URI);
66+
final RxWebSockets webSockets = new RxWebSockets(newWebSocket,
67+
SERVER_URI,
68+
ImmutableList.of("chat"),
69+
ImmutableList.<Header>of());
6570
final RxJsonWebSockets jsonWebSockets = new RxJsonWebSockets(webSockets, gson, Message.class);
6671
final SocketConnection socketConnection = new SocketConnectionImpl(jsonWebSockets, Schedulers.computation());
6772
socket = new Socket(socketConnection, Schedulers.computation());

websockets-rxjava/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ signing {
4747
}
4848
group = "com.appunite"
4949
archivesBaseName = "websockets-rxjava"
50-
version = "2.0.0"
50+
version = "2.1.0"
5151

5252
if (!project.hasProperty("ossrhUsername")) {
5353
project.ext.setProperty("ossrhUsername", null)

websockets-rxjava/src/main/java/com/appunite/websocket/rx/RxWebSockets.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@
3131
import com.appunite.websocket.rx.messages.RxEventStringMessage;
3232
import com.appunite.websocket.rx.messages.RxEventUnknownMessage;
3333

34+
import org.apache.http.Header;
35+
3436
import java.io.IOException;
3537
import java.net.URI;
38+
import java.util.List;
3639

3740
import javax.annotation.Nonnull;
3841

@@ -50,16 +53,27 @@ public class RxWebSockets {
5053
* Create instance of {@link RxWebSockets}
5154
* @param newWebSocket {@link NewWebSocket} instance
5255
* @param uri uri for websocket (eg. "wss://some.server/endpoint")
56+
* @param subProtocols application-level protocols layered over the WebSocket Protocol
57+
* @param headers headers sent to server
5358
*/
54-
public RxWebSockets(@Nonnull NewWebSocket newWebSocket, @Nonnull final URI uri) {
59+
public RxWebSockets(@Nonnull NewWebSocket newWebSocket,
60+
@Nonnull final URI uri,
61+
@Nonnull List<String> subProtocols,
62+
@Nonnull List<Header> headers) {
5563
this.newWebSocket = newWebSocket;
5664
this.uri = uri;
65+
this.subProtocols = subProtocols;
66+
this.headers = headers;
5767
}
5868

5969
@Nonnull
6070
private final NewWebSocket newWebSocket;
6171
@Nonnull
6272
private final URI uri;
73+
@Nonnull
74+
private final List<String> subProtocols;
75+
@Nonnull
76+
private final List<Header> headers;
6377

6478
/**
6579
* Returns observable that connected to a websocket and returns {@link RxJsonEvent}'s
@@ -111,7 +125,7 @@ public void onUnknownMessage(@Nonnull byte[] data) throws IOException, Interrupt
111125
}
112126
};
113127
try {
114-
connection = newWebSocket.create(uri, listener);
128+
connection = newWebSocket.create(uri, subProtocols, headers, listener);
115129
} catch (IOException e) {
116130
subscriber.onNext(new RxEventDisconnected(e));
117131
subscriber.onError(e);

websockets/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ signing {
5151

5252
group = "com.appunite"
5353
archivesBaseName = "websockets-java"
54-
version = "2.0.0"
54+
version = "2.1.0"
5555

5656
if (!project.hasProperty("ossrhUsername")) {
5757
project.ext.setProperty("ossrhUsername", null)

websockets/src/main/java/com/appunite/websocket/NewWebSocket.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121
import com.appunite.websocket.internal.SocketProvider;
2222
import com.appunite.websocket.internal.SocketProviderImpl;
2323

24+
import org.apache.http.Header;
25+
2426
import java.io.IOException;
2527
import java.net.InetSocketAddress;
2628
import java.net.Socket;
2729
import java.net.URI;
2830
import java.net.UnknownHostException;
31+
import java.util.List;
2932

3033
import javax.annotation.Nonnull;
3134

35+
import static com.appunite.websocket.tools.Preconditions.checkArgument;
3236
import static com.appunite.websocket.tools.Preconditions.checkNotNull;
3337

3438
/**
@@ -73,17 +77,34 @@ public NewWebSocket(@Nonnull SecureRandomProvider secureRandomProvider,
7377
*
7478
* @param uri
7579
* uri of websocket
80+
* @param subProtocols
81+
* application-level protocols layered over the WebSocket Protocol
82+
* @param headers
83+
* headers sent to server
84+
* @param listener
85+
* websocket listener
7686
* @throws UnknownHostException
7787
* when could not connect to selected host
7888
* @throws IOException
7989
* thrown when I/O exception occur
8090
*/
8191
public WebSocketConnection create(@Nonnull URI uri,
92+
@Nonnull List<String> subProtocols,
93+
@Nonnull List<Header> headers,
8294
@Nonnull WebSocketListener listener) throws IOException {
8395
checkNotNull(uri, "Uri cannot be null");
96+
checkNotNull(subProtocols, "subProtocols can not be null");
97+
checkArgument(subProtocols.size() >= 0, "You have to provide at least one subProtocol");
98+
checkNotNull(headers, "headers can not be null");
99+
checkNotNull(listener, "listener can not be null");
84100

85101
final Socket socket = socketProvider.getSocket(uri);
86-
return new WebSocketConnection(socket, listener, uri, secureRandomProvider);
102+
return new WebSocketConnection(socket,
103+
listener,
104+
uri,
105+
subProtocols,
106+
headers,
107+
secureRandomProvider);
87108
}
88109

89110

websockets/src/main/java/com/appunite/websocket/WebSocket.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package com.appunite.websocket;
1818

19+
import org.apache.http.Header;
20+
1921
import java.io.IOException;
2022
import java.net.URI;
2123
import java.net.UnknownHostException;
24+
import java.util.ArrayList;
2225

2326
import javax.annotation.Nonnull;
2427

@@ -80,7 +83,12 @@ public void connect(@Nonnull URI uri) throws IOException,
8083
WebSocketConnection connect;
8184
synchronized (connectLock) {
8285
checkState(this.connect == null);
83-
connect = newWebSocket.create(uri, listener);
86+
final ArrayList<String> subProtocols = new ArrayList<>(1);
87+
subProtocols.add("chat");
88+
connect = newWebSocket.create(uri,
89+
subProtocols,
90+
new ArrayList<Header>(),
91+
listener);
8492
this.connect = connect;
8593
}
8694
connect.connect();

websockets/src/main/java/com/appunite/websocket/WebSocketConnection.java

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@
2323
import org.apache.http.Header;
2424
import org.apache.http.HttpStatus;
2525
import org.apache.http.ParseException;
26+
import org.apache.http.ProtocolVersion;
2627
import org.apache.http.StatusLine;
28+
import org.apache.http.message.BasicHeader;
29+
import org.apache.http.message.BasicLineFormatter;
2730
import org.apache.http.message.BasicLineParser;
31+
import org.apache.http.message.BasicRequestLine;
32+
import org.apache.http.message.LineFormatter;
33+
import org.apache.http.util.CharArrayBuffer;
2834
import org.apache.http.util.EncodingUtils;
2935

3036
import java.io.IOException;
@@ -83,6 +89,10 @@ private enum State {
8389
@Nonnull
8490
private final URI uri;
8591
@Nonnull
92+
private final List<String> subProtocols;
93+
@Nonnull
94+
private final List<Header> headers;
95+
@Nonnull
8696
private final SecureRandomProvider secureRandomProvider;
8797

8898

@@ -98,15 +108,19 @@ private enum State {
98108
private int writing = 0;
99109

100110
/**
101-
* @see NewWebSocket#create(URI, WebSocketListener)
102-
*/
103-
WebSocketConnection(@Nonnull Socket socket,
104-
@Nonnull WebSocketListener listener,
105-
@Nonnull URI uri,
106-
@Nonnull SecureRandomProvider secureRandomProvider) {
111+
* @see NewWebSocket#create(URI, List, List, WebSocketListener)
112+
*/
113+
WebSocketConnection(@Nonnull Socket socket,
114+
@Nonnull WebSocketListener listener,
115+
@Nonnull URI uri,
116+
@Nonnull List<String> subProtocols,
117+
@Nonnull List<Header> headers,
118+
@Nonnull SecureRandomProvider secureRandomProvider) {
107119
this.socket = socket;
108120
this.listener = listener;
109121
this.uri = uri;
122+
this.subProtocols = subProtocols;
123+
this.headers = headers;
110124
this.secureRandomProvider = secureRandomProvider;
111125
}
112126

@@ -138,7 +152,7 @@ public void connect() throws WrongWebsocketResponse,
138152
outputStream = new WebSocketWriter(socket.getOutputStream());
139153
}
140154
final String secret = generateHandshakeSecret();
141-
writeHeaders(uri, secret);
155+
writeHeaders(uri, secret, subProtocols, headers);
142156
readHandshakeHeaders(inputStream, secret);
143157
} catch (IOException e) {
144158
synchronized (stateLock) {
@@ -210,21 +224,43 @@ private String generateHandshakeSecret() {
210224
*
211225
* @param uri uri
212226
* @param secret secret that is written to headers
227+
* @param subProtocols application-level protocols layered over the WebSocket Protocol
228+
* @param headers headers sent to server
213229
* @throws IOException
214230
*/
215-
private void writeHeaders(@Nonnull URI uri, @Nonnull String secret) throws IOException {
231+
private void writeHeaders(@Nonnull URI uri,
232+
@Nonnull String secret,
233+
@Nonnull List<String> subProtocols,
234+
@Nonnull List<Header> headers)
235+
throws IOException {
216236
checkNotNull(uri);
217237
checkNotNull(secret);
238+
checkNotNull(subProtocols);
239+
checkArgument(subProtocols.size() >= 0, "You have to provide at least one subProtocol");
240+
checkNotNull(headers);
218241
final String portPart = uri.getPort() < 0 || uri.getPort() == 80 ? "" : ":"+String.valueOf(uri.getPort());
219242

220-
outputStream.writeLine("GET " + uri.getPath() + " HTTP/1.1");
221-
outputStream.writeLine("Upgrade: websocket");
222-
outputStream.writeLine("Connection: Upgrade");
223-
outputStream.writeLine("Host: " + uri.getHost()+portPart);
224-
outputStream.writeLine("Origin: " + uri);
225-
outputStream.writeLine("Sec-WebSocket-Key: " + secret);
226-
outputStream.writeLine("Sec-WebSocket-Protocol: chat");
227-
outputStream.writeLine("Sec-WebSocket-Version: 13");
243+
final BasicRequestLine requestLine = new BasicRequestLine("GET", uri.getPath(), new ProtocolVersion("HTTP", 1, 1));
244+
245+
final ArrayList<Header> sentHeaders = new ArrayList<>();
246+
sentHeaders.add(new BasicHeader("Upgrade", "websocket"));
247+
sentHeaders.add(new BasicHeader("Connection", "Upgrade"));
248+
sentHeaders.add(new BasicHeader("Host", uri.getHost() + portPart));
249+
sentHeaders.add(new BasicHeader("Origin", uri.toString()));
250+
sentHeaders.add(new BasicHeader("Sec-WebSocket-Key", secret));
251+
for (final String subProtocol : subProtocols) {
252+
sentHeaders.add(new BasicHeader("Sec-WebSocket-Protocol", subProtocol));
253+
}
254+
sentHeaders.add(new BasicHeader("Sec-WebSocket-Version", "13"));
255+
sentHeaders.addAll(headers);
256+
257+
final LineFormatter lineFormatter = BasicLineFormatter.INSTANCE;
258+
final CharArrayBuffer buffer = new CharArrayBuffer(100);
259+
outputStream.writeLine(lineFormatter.formatRequestLine(buffer, requestLine).toString());
260+
for (final Header header : sentHeaders) {
261+
outputStream.writeLine(lineFormatter.formatHeader(buffer, header).toString());
262+
}
263+
228264
outputStream.writeNewLine();
229265
outputStream.flush();
230266
}

0 commit comments

Comments
 (0)