Skip to content

Commit c4cef43

Browse files
committed
more efficient way of cleanup of SerializedMappedSet
1 parent 52fb330 commit c4cef43

4 files changed

Lines changed: 90 additions & 19 deletions

File tree

src/nl/melp/redis/collections/ScanIterator.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,30 @@
44

55
import java.io.IOException;
66
import java.util.Iterator;
7+
import java.util.LinkedList;
78
import java.util.List;
89

910
public class ScanIterator implements Iterator<byte[]> {
1011
private final Redis redis;
1112
private final byte[] operation;
1213
private final byte[] keyName;
14+
private final byte[] match;
1315
private int cursor;
1416
protected int localCursor;
1517
protected List<byte[]> buffer;
1618

1719
public ScanIterator(Redis redis, byte[]operation, byte[] keyName) {
20+
this(redis, operation, null, keyName);
21+
}
22+
23+
24+
public ScanIterator(Redis redis, byte[]operation, byte[] match, byte[] keyName) {
1825
this.redis = redis;
1926
this.operation = operation;
2027
this.keyName = keyName;
2128
this.cursor = 0;
2229
this.localCursor = 0;
30+
this.match = match;
2331
}
2432

2533

@@ -32,8 +40,18 @@ public boolean hasNext() {
3240
}
3341
try {
3442
List<Object> result;
43+
List<Object> args = new LinkedList<>();
44+
args.add(this.operation);
45+
if (keyName != null) {
46+
args.add(this.keyName);
47+
}
48+
args.add(Integer.toString(this.cursor));
49+
if (match != null) {
50+
args.add("MATCH");
51+
args.add(match);
52+
}
3553
synchronized (redis) {
36-
result = redis.call(this.operation, this.keyName, Integer.toString(this.cursor));
54+
result = redis.call(args.toArray());
3755
}
3856
this.cursor = Integer.valueOf(new String((byte[]) result.get(0)));
3957
this.buffer = (List<byte[]>) result.get(1);

src/nl/melp/redis/collections/SerializedMappedSet.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,27 @@ public SerializedMemberSet(K key, ISerializer<V> serializer, Redis redis, String
1919
}
2020
}
2121

22+
@Override
23+
public boolean addAll(Collection<? extends V> collection) {
24+
keys.add(key);
25+
return super.addAll(collection);
26+
}
27+
28+
@Override
29+
public boolean removeAll(Collection<?> collection) {
30+
boolean ret = super.removeAll(collection);
31+
if (ret && size() == 0) {
32+
keys.remove(key);
33+
}
34+
return ret;
35+
}
36+
37+
@Override
38+
public boolean add(V o) {
39+
keys.add(key);
40+
return super.add(o);
41+
}
42+
2243
@Override
2344
public void clear() {
2445
super.clear();
@@ -68,17 +89,6 @@ private byte[] withPrefix(byte[] value) {
6889
return prefixedValue;
6990
}
7091

71-
private <T> T call(String command, byte[]... args) {
72-
byte[][] rawArgs = new byte[args.length + 1][];
73-
rawArgs[0] = command.getBytes();
74-
System.arraycopy(args, 0, rawArgs, 1, args.length);
75-
try {
76-
return redis.call((Object[])rawArgs);
77-
} catch (IOException e) {
78-
throw new UnsupportedOperationException(e);
79-
}
80-
}
81-
8292
@Override
8393
public int size() {
8494
return keys.size();
@@ -140,10 +150,19 @@ public void putAll(Map<? extends K, ? extends Set<V>> map) {
140150
@Override
141151
public void clear() {
142152
synchronized (redis) {
143-
for (K key : keys) {
144-
this.get(key).clear();
153+
byte[] copy = new byte[prefixLength + 2];
154+
System.arraycopy(prefix, 0, copy, 0, prefixLength);
155+
copy[prefixLength] = ':';
156+
copy[prefixLength + 1] = '*';
157+
Iterator<byte[]> i = new ScanIterator(redis, "SCAN".getBytes(), copy, null);
158+
159+
while (i.hasNext()) {
160+
try {
161+
redis.call("DEL", i.next());
162+
} catch (IOException e) {
163+
throw new RuntimeException(e);
164+
}
145165
}
146-
keys.clear();
147166
}
148167
}
149168

src/nl/melp/redis/collections/SerializedSet.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
import java.io.Serializable;
66
import java.util.Arrays;
77
import java.util.Collection;
8+
import java.util.HashSet;
89
import java.util.Iterator;
910
import java.util.List;
1011
import java.util.Set;
12+
import java.util.concurrent.atomic.AtomicBoolean;
1113
import java.util.stream.Collectors;
1214

1315
public class SerializedSet<V> implements Set<V>, Collection<V> {
@@ -40,7 +42,7 @@ public boolean add(V o) {
4042

4143
@Override
4244
public boolean addAll(java.util.Collection<? extends V> collection) {
43-
return innerSet.addAll(collection.stream().map(serializer::serialize).collect(Collectors.toList()));
45+
return innerSet.addAll(collection.stream().map(serializer::serialize).collect(Collectors.toSet()));
4446
}
4547

4648
@Override
@@ -81,7 +83,12 @@ public V next() {
8183

8284
@Override
8385
public boolean containsAll(Collection<?> collection) {
84-
throw new UnsupportedOperationException("Not implemented");
86+
for (Object item : collection) {
87+
if (!contains(item)) {
88+
return false;
89+
}
90+
}
91+
return true;
8592
}
8693

8794
@Override
@@ -91,7 +98,13 @@ public boolean retainAll(Collection<?> collection) {
9198

9299
@Override
93100
public boolean removeAll(Collection<?> collection) {
94-
throw new UnsupportedOperationException("Not implemented");
101+
AtomicBoolean b = new AtomicBoolean(false);
102+
collection.forEach(item -> {
103+
if (this.remove(item)) {
104+
b.set(true);
105+
}
106+
});
107+
return b.get();
95108
}
96109

97110
@Override

test/nl/melp/redis/collections/IntegrationTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,30 @@ public void testSerializedMappedSetForInForEach() throws IOException {
359359
@Test
360360
public void testSerializedMappedSetKeySynchronization() throws IOException {
361361
Map<String, Set<String>> map = new SerializedMappedSet<>(Serializers.of(String.class), Serializers.of(String.class), redis, keyName);
362+
Map<String, Set<String>> secondary = new SerializedMappedSet<>(Serializers.of(String.class), Serializers.of(String.class), redis, keyName);
363+
362364
Assert.assertEquals(0, map.get("A").size());
365+
Assert.assertEquals(0, secondary.get("A").size());
366+
Assert.assertEquals(0, map.size());
367+
Assert.assertEquals(0, secondary.size());
368+
map.get("A").add("foo");
369+
Assert.assertEquals(1, map.size());
370+
Assert.assertEquals(1, secondary.size());
371+
map.get("A").remove("foo");
372+
Assert.assertEquals(0, map.size());
373+
Assert.assertEquals(0, secondary.size());
374+
map.get("A").addAll(new HashSet<String>(){{ add ("foo"); add("bar"); }});
375+
Assert.assertEquals(1, map.size());
376+
Assert.assertEquals(1, secondary.size());
377+
map.get("A").removeAll(new HashSet<String>(){{ add ("foo"); add("bar"); }});
363378
Assert.assertEquals(0, map.size());
364-
map.remove("A");
379+
Assert.assertEquals(0, secondary.size());
380+
381+
map.get("A").addAll(new HashSet<String>(){{ add ("foo"); add("bar"); }});
382+
map.clear();
383+
365384
Assert.assertEquals(0, map.size());
385+
Assert.assertEquals(0, secondary.size());
386+
366387
}
367388
}

0 commit comments

Comments
 (0)