Skip to content

Commit d5ff77d

Browse files
committed
add use case for subscription on key events
1 parent 5755e86 commit d5ff77d

2 files changed

Lines changed: 119 additions & 15 deletions

File tree

src/nl/melp/redis/Redis.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ void write(List<?> list) throws IOException, IllegalArgumentException {
7878

7979
for (Object o : list) {
8080
if (o instanceof byte[]) {
81-
write((byte[])o);
81+
write((byte[]) o);
8282
} else if (o instanceof String) {
8383
write(((String) o).getBytes());
8484
} else if (o instanceof Long) {
@@ -136,12 +136,12 @@ static class ServerError extends IOException {
136136

137137
/**
138138
* Parse incoming data from the stream.
139-
*
139+
* <p>
140140
* Based on each of the markers which will identify the type of data being sent, the parsing
141141
* is delegated to the type-specific methods.
142142
*
143143
* @return The parsed object
144-
* @throws IOException Propagated from the stream
144+
* @throws IOException Propagated from the stream
145145
* @throws ProtocolException In case unexpected bytes are encountered.
146146
*/
147147
Object parse() throws IOException, ProtocolException {
@@ -267,8 +267,8 @@ public Redis(Socket socket) throws IOException {
267267
/**
268268
* Construct the connection with the specified Socket as the server connection with specified buffer sizes.
269269
*
270-
* @param socket Socket to connect to
271-
* @param inputBufferSize buffer size in bytes for the input stream
270+
* @param socket Socket to connect to
271+
* @param inputBufferSize buffer size in bytes for the input stream
272272
* @param outputBufferSize buffer size in bytes for the output stream
273273
* @throws IOException If a socket error occurs.
274274
*/
@@ -282,7 +282,7 @@ public Redis(Socket socket, int inputBufferSize, int outputBufferSize) throws IO
282282
/**
283283
* Construct with the specified streams to respectively read from and write to.
284284
*
285-
* @param inputStream Read from this stream
285+
* @param inputStream Read from this stream
286286
* @param outputStream Write to this stream
287287
*/
288288
public Redis(InputStream inputStream, OutputStream outputStream) {
@@ -294,14 +294,24 @@ public Redis(InputStream inputStream, OutputStream outputStream) {
294294
* Execute a Redis command and return it's result.
295295
*
296296
* @param args Command and arguments to pass into redis.
297-
* @param <T> The expected result type
297+
* @param <T> The expected result type
298298
* @return Result of redis.
299-
*
300299
* @throws IOException All protocol and io errors are IO exceptions.
301300
*/
302301
public <T> T call(Object... args) throws IOException {
303302
writer.write(Arrays.asList((Object[]) args));
304303
writer.flush();
304+
return read();
305+
}
306+
307+
/**
308+
* Does a blocking read to wait for redis to send data.
309+
*
310+
* @param <T> The expected result type.
311+
* @return Result of redis
312+
* @throws IOException Propagated
313+
*/
314+
public <T> T read() throws IOException {
305315
return (T) reader.parse();
306316
}
307317

@@ -363,13 +373,17 @@ public interface FailableConsumer<T, E extends Throwable> {
363373
* Utility method to execute some command with redis and close the connection directly after.
364374
*
365375
* @param callback The callback to perform with redis.
366-
* @param addr Connection IP address
367-
* @param port Connection port
376+
* @param addr Connection IP address
377+
* @param port Connection port
368378
* @throws IOException Propagated
369379
*/
370380
public static void run(FailableConsumer<Redis, IOException> callback, String addr, int port) throws IOException {
371381
try (Socket s = new Socket(addr, port)) {
372-
callback.accept(new Redis(s));
382+
run(callback, s);
373383
}
374384
}
385+
386+
public static void run(FailableConsumer<Redis, IOException> callback, Socket s) throws IOException {
387+
callback.accept(new Redis(s));
388+
}
375389
}

test/nl/melp/redis/RedisTest.java

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import java.io.ByteArrayInputStream;
44
import java.io.IOException;
55
import java.net.Socket;
6+
import java.net.SocketTimeoutException;
67
import java.time.LocalDateTime;
78
import java.time.temporal.ChronoUnit;
8-
import java.util.LinkedHashMap;
9-
import java.util.List;
10-
import java.util.Map;
11-
import java.util.Random;
9+
import java.util.*;
1210
import java.util.concurrent.ExecutorService;
1311
import java.util.concurrent.Executors;
1412
import java.util.concurrent.ScheduledExecutorService;
@@ -28,6 +26,7 @@ public static void main(String[] args) throws IOException, InterruptedException
2826
bufferSizePerformanceTest(16, 16);
2927
socketManagementPerformanceTest();
3028
binaryTest();
29+
subscribeTest();
3130
}
3231

3332
private static void assertEqual(String a, String b) {
@@ -195,6 +194,18 @@ private static void integrationTest() throws InterruptedException {
195194
r.shutdown();
196195
r.awaitTermination(2, TimeUnit.SECONDS);
197196
assertTrue(b.get());
197+
198+
exec.accept((redis) -> {
199+
redis.call("SET", "foo", "val");
200+
redis.call("PEXPIRE", "foo", "100");
201+
try {
202+
Thread.sleep(101);
203+
} catch (InterruptedException e) {
204+
e.printStackTrace();
205+
throw new RuntimeException(e);
206+
}
207+
assertTrue(null == redis.call("GET", "foo"));
208+
});
198209
}
199210

200211
private static void bufferSizePerformanceTest(int numBitsFrom, int numBitsTo) throws IOException, InterruptedException {
@@ -344,6 +355,85 @@ public static void binaryTest() throws IOException {
344355
}
345356

346357

358+
public static void subscribeTest() throws IOException, InterruptedException {
359+
List<String> events = Collections.synchronizedList(new LinkedList<>());
360+
361+
ExecutorService s = Executors.newFixedThreadPool(2);
362+
Redis.run(
363+
redis -> {
364+
redis.call("CONFIG", "SET", "notify-keyspace-events", "AKE");
365+
}, "localhost", 6379
366+
);
367+
AtomicInteger n = new AtomicInteger(0);
368+
369+
s.submit(() -> {
370+
try {
371+
System.out.println("Subscription starting");
372+
Socket timeoutSocket = new Socket("localhost", 6379);
373+
timeoutSocket.setSoTimeout(1500);
374+
try {
375+
Redis.run(redis -> {
376+
redis.call("PSUBSCRIBE", "__keyevent@0__:*", "*");
377+
LinkedList<Object> result;
378+
try {
379+
while ((result = redis.read()) != null) {
380+
try {
381+
System.out.println("Received event " + String.join(", ", result.stream().map(r -> new String((byte[])r)).toArray(String[]::new)));
382+
} catch (ClassCastException e) {
383+
continue;
384+
}
385+
if (
386+
result.get(0) instanceof byte[] && Arrays.equals((byte[])result.get(0), "pmessage".getBytes())
387+
&& result.get(1) instanceof byte[] && Arrays.equals((byte[])result.get(1), "__keyevent@0__:*".getBytes())
388+
) {
389+
events.add(new String((byte[])result.get(2)).replace("__keyevent@0__:", "") + ":" + new String((byte[])result.get(3)));
390+
}
391+
}
392+
} catch (SocketTimeoutException ignored) {
393+
} catch (Exception e) {
394+
e.printStackTrace();
395+
}
396+
}, timeoutSocket);
397+
} catch (SocketTimeoutException ignored) {
398+
}
399+
} catch (IOException e) {
400+
e.printStackTrace();
401+
}
402+
n.incrementAndGet();
403+
});
404+
Thread.sleep(1000);
405+
406+
s.submit(() -> {
407+
try {
408+
Redis.run(redis -> {
409+
redis.call("SET", "a", "b");
410+
redis.call("PEXPIRE", "a", "100");
411+
try {
412+
// so the key will expire
413+
Thread.sleep(500);
414+
} catch (InterruptedException e) {
415+
e.printStackTrace();
416+
}
417+
}, "localhost", 6379);
418+
} catch (IOException e) {
419+
e.printStackTrace();
420+
}
421+
n.incrementAndGet();
422+
});
423+
424+
while (n.get() < 2) {
425+
System.out.println("Waiting for both threads to finish");
426+
Thread.sleep(100);
427+
}
428+
s.shutdown();
429+
s.awaitTermination(1000, TimeUnit.MILLISECONDS);
430+
431+
assertTrue(events.get(0).equals("set:a"));
432+
assertTrue(events.get(1).equals("expire:a"));
433+
assertTrue(events.get(2).equals("expired:a"));
434+
}
435+
436+
347437
private static String msg = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse in purus in dui cursus dignissim id at neque. Duis porta ullamcorper aliquam. Suspendisse hendrerit urna id felis aliquet rutrum. Fusce ultricies magna elit, id volutpat risus dictum et. Sed pretium elementum arcu, vitae aliquet ligula. Phasellus viverra vel arcu vel dictum. Fusce ac purus fringilla neque dapibus sollicitudin sit amet et felis. Nulla gravida fringilla ex sit amet faucibus. Etiam sit amet nisl id est dictum porttitor eget nec risus. Vivamus et ultrices arcu, vitae accumsan lectus. Phasellus tempus tortor lectus, vitae consequat enim dictum auctor. Ut elementum sapien eu diam tempus condimentum.\n" +
348438
"\n" +
349439
"Vestibulum ultricies bibendum arcu ut commodo. Morbi tristique dui quis commodo consectetur. Praesent venenatis augue justo, sed placerat lectus aliquam eget. Duis malesuada lobortis quam id congue. Fusce mollis faucibus arcu. Aliquam consectetur leo eu luctus accumsan. Nulla nec diam non ex eleifend fringilla sit amet et ante. Mauris posuere est ut turpis pellentesque hendrerit.\n" +

0 commit comments

Comments
 (0)