Skip to content

Commit ac6a6b6

Browse files
committed
implement new API for network: 'sendWithFeedback'
1 parent 1e0e4e1 commit ac6a6b6

10 files changed

Lines changed: 187 additions & 39 deletions

File tree

rlib-network/src/main/java/com/ss/rlib/network/Connection.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.jetbrains.annotations.NotNull;
77
import reactor.core.publisher.Flux;
88

9+
import java.util.concurrent.CompletableFuture;
910
import java.util.function.BiConsumer;
1011

1112
/**
@@ -22,9 +23,9 @@ class ReceivedPacketEvent<C extends Connection<?, ?>, R extends ReadablePacket>
2223
}
2324

2425
/**
25-
* Get a remote address or 'unkonwn'.
26+
* Get a remote address of this connection.
2627
*
27-
* @return the remote address or 'unkonwn'.
28+
* @return the remote address.
2829
*/
2930
@NotNull String getRemoteAddress();
3031

@@ -54,6 +55,14 @@ class ReceivedPacketEvent<C extends Connection<?, ?>, R extends ReadablePacket>
5455
*/
5556
void send(@NotNull W packet);
5657

58+
/**
59+
* Send a packet to connection's owner with async feedback of this sending.
60+
*
61+
* @param packet the writable packet.
62+
* @return the async result with true if the packet was sent or false if sending was failed.
63+
*/
64+
@NotNull CompletableFuture<Boolean> sendWithFeedback(@NotNull W packet);
65+
5766
/**
5867
* Register a consumer to handle received packets.
5968
*

rlib-network/src/main/java/com/ss/rlib/network/impl/AbstractConnection.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.ss.rlib.network.packet.PacketWriter;
1717
import com.ss.rlib.network.packet.ReadablePacket;
1818
import com.ss.rlib.network.packet.WritablePacket;
19+
import com.ss.rlib.network.packet.impl.WritablePacketWrapper;
1920
import com.ss.rlib.network.util.NetworkUtils;
2021
import lombok.Getter;
2122
import org.jetbrains.annotations.NotNull;
@@ -25,6 +26,7 @@
2526

2627
import java.nio.channels.AsynchronousChannel;
2728
import java.nio.channels.AsynchronousSocketChannel;
29+
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.atomic.AtomicBoolean;
2931
import java.util.concurrent.locks.StampedLock;
3032
import java.util.function.BiConsumer;
@@ -39,20 +41,26 @@ public abstract class AbstractConnection<R extends ReadablePacket, W extends Wri
3941

4042
private static final Logger LOGGER = LoggerManager.getLogger(AbstractConnection.class);
4143

44+
private static class WritablePacketWithFeedback<W extends WritablePacket> extends
45+
WritablePacketWrapper<CompletableFuture<Boolean>, W> {
46+
47+
public WritablePacketWithFeedback(@NotNull CompletableFuture<Boolean> attachment, @NotNull W packet) {
48+
super(attachment, packet);
49+
}
50+
}
51+
4252
protected final @Getter NetworkCryptor crypt;
53+
protected final @Getter String remoteAddress;
4354

4455
protected final Network<? extends Connection<R, W>> network;
4556
protected final BufferAllocator bufferAllocator;
46-
4757
protected final AsynchronousSocketChannel channel;
48-
protected final @Getter String remoteAddress;
58+
protected final LinkedList<WritablePacket> pendingPackets;
59+
protected final StampedLock lock;
4960

5061
protected final AtomicBoolean isWriting;
5162
protected final AtomicBoolean closed;
5263

53-
protected final LinkedList<W> pendingPackets;
54-
protected final StampedLock lock;
55-
5664
protected final Array<BiConsumer<? super Connection<R, W>, ? super R>> subscribers;
5765

5866
protected final int maxPacketsByRead;
@@ -125,7 +133,7 @@ protected void registerFluxOnReceivedPackets(@NotNull FluxSink<? super R> sink)
125133
sink.onDispose(() -> subscribers.remove(listener));
126134
}
127135

128-
protected @Nullable W nextPacketToWrite() {
136+
protected @Nullable WritablePacket nextPacketToWrite() {
129137
long stamp = lock.writeLock();
130138
try {
131139
return pendingPackets.poll();
@@ -168,8 +176,20 @@ public boolean isClosed() {
168176
return closed.get();
169177
}
170178

179+
protected void onWrittenPacket(@NotNull WritablePacket packet) { }
180+
181+
protected void onSentPacket(@NotNull WritablePacket packet, @NotNull Boolean result) {
182+
if (packet instanceof WritablePacketWithFeedback) {
183+
((WritablePacketWithFeedback<W>) packet).getAttachment().complete(result);
184+
}
185+
}
186+
171187
@Override
172188
public final void send(@NotNull W packet) {
189+
sendImpl(packet);
190+
}
191+
192+
protected void sendImpl(@NotNull WritablePacket packet) {
173193

174194
if (isClosed()) {
175195
return;
@@ -185,6 +205,20 @@ public final void send(@NotNull W packet) {
185205
getPacketWriter().writeNextPacket();
186206
}
187207

208+
@Override
209+
public @NotNull CompletableFuture<Boolean> sendWithFeedback(@NotNull W packet) {
210+
211+
var asyncResult = new CompletableFuture<Boolean>();
212+
213+
sendImpl(new WritablePacketWithFeedback<>(asyncResult, packet));
214+
215+
if (isClosed()) {
216+
return CompletableFuture.completedFuture(Boolean.FALSE);
217+
}
218+
219+
return asyncResult;
220+
}
221+
188222
/**
189223
* Clear waited packets.
190224
*/
@@ -198,6 +232,11 @@ protected void clearWaitPackets() {
198232
}
199233

200234
protected void doClearWaitPackets() {
235+
236+
for (var pendingPacket : pendingPackets) {
237+
onSentPacket(pendingPacket, Boolean.FALSE);
238+
}
239+
201240
pendingPackets.clear();
202241
}
203242
}

rlib-network/src/main/java/com/ss/rlib/network/impl/DefaultDataConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public DefaultDataConnection(
6060
bufferAllocator,
6161
this::updateLastActivity,
6262
this::nextPacketToWrite,
63+
this::onWrittenPacket,
64+
this::onSentPacket,
6365
packetLengthHeaderSize
6466
);
6567
}

rlib-network/src/main/java/com/ss/rlib/network/impl/IdBasedPacketConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public IdBasedPacketConnection(
7070
bufferAllocator,
7171
this::updateLastActivity,
7272
this::nextPacketToWrite,
73+
this::onWrittenPacket,
74+
this::onSentPacket,
7375
packetLengthHeaderSize,
7476
packetIdHeaderSize
7577
);

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/AbstractPacketWriter.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import java.nio.channels.AsynchronousSocketChannel;
1515
import java.nio.channels.CompletionHandler;
1616
import java.util.concurrent.atomic.AtomicBoolean;
17+
import java.util.function.BiConsumer;
18+
import java.util.function.Consumer;
1719
import java.util.function.Supplier;
1820

1921
/**
@@ -25,15 +27,15 @@ public abstract class AbstractPacketWriter<W extends WritablePacket, C extends C
2527

2628
private static final Logger LOGGER = LoggerManager.getLogger(AbstractPacketWriter.class);
2729

28-
private final CompletionHandler<Integer, W> writeHandler = new CompletionHandler<>() {
30+
private final CompletionHandler<Integer, WritablePacket> writeHandler = new CompletionHandler<>() {
2931

3032
@Override
31-
public void completed(@NotNull Integer result, @NotNull W packet) {
33+
public void completed(@NotNull Integer result, @NotNull WritablePacket packet) {
3234
handleSuccessfulWriting(result, packet);
3335
}
3436

3537
@Override
36-
public void failed(@NotNull Throwable exc, @NotNull W packet) {
38+
public void failed(@NotNull Throwable exc, @NotNull WritablePacket packet) {
3739
handleFailedWriting(exc, packet);
3840
}
3941
};
@@ -49,23 +51,29 @@ public void failed(@NotNull Throwable exc, @NotNull W packet) {
4951
protected volatile ByteBuffer firstWriteTempBuffer;
5052
protected volatile ByteBuffer secondWriteTempBuffer;
5153

52-
protected final Runnable updateActivityFunction;
53-
protected final Supplier<@Nullable W> nextWritePacketSupplier;
54+
protected final @NotNull Runnable updateActivityFunction;
55+
protected final @NotNull Supplier<@Nullable WritablePacket> nextWritePacketSupplier;
56+
protected final @NotNull Consumer<@NotNull WritablePacket> writtenPacketHandler;
57+
protected final @NotNull BiConsumer<@NotNull WritablePacket, Boolean> sentPacketHandler;
5458

5559
public AbstractPacketWriter(
5660
@NotNull C connection,
5761
@NotNull AsynchronousSocketChannel channel,
5862
@NotNull BufferAllocator bufferAllocator,
5963
@NotNull Runnable updateActivityFunction,
60-
@NotNull Supplier<@Nullable W> nextWritePacketSupplier
64+
@NotNull Supplier<@Nullable WritablePacket> packetProvider,
65+
@NotNull Consumer<@NotNull WritablePacket> writtenPacketHandler,
66+
@NotNull BiConsumer<@NotNull WritablePacket, Boolean> sentPacketHandler
6167
) {
6268
this.connection = connection;
6369
this.channel = channel;
6470
this.bufferAllocator = bufferAllocator;
6571
this.firstWriteBuffer = bufferAllocator.takeWriteBuffer();
6672
this.secondWriteBuffer = bufferAllocator.takeWriteBuffer();
6773
this.updateActivityFunction = updateActivityFunction;
68-
this.nextWritePacketSupplier = nextWritePacketSupplier;
74+
this.nextWritePacketSupplier = packetProvider;
75+
this.writtenPacketHandler = writtenPacketHandler;
76+
this.sentPacketHandler = sentPacketHandler;
6977
}
7078

7179
@Override
@@ -90,10 +98,16 @@ public void writeNextPacket() {
9098
isWriting.set(false);
9199
}
92100

93-
completed(waitPacket);
101+
writtenPacketHandler.accept(waitPacket);
94102
}
95103

96-
protected @NotNull ByteBuffer serialize(@NotNull W packet) {
104+
protected @NotNull ByteBuffer serialize(@NotNull WritablePacket packet) {
105+
106+
if (packet instanceof WritablePacketWrapper) {
107+
packet = ((WritablePacketWrapper) packet).getPacket();
108+
}
109+
110+
W resultPacket = (W) packet;
97111

98112
var expectedLength = packet.getExpectedLength();
99113
var totalSize = getTotalSize(packet, expectedLength);
@@ -102,9 +116,9 @@ public void writeNextPacket() {
102116
if (expectedLength != -1 && totalSize > firstWriteBuffer.capacity()) {
103117
firstWriteTempBuffer = bufferAllocator.takeBuffer(totalSize);
104118
secondWriteTempBuffer = bufferAllocator.takeBuffer(totalSize);
105-
return serialize(packet, expectedLength, totalSize, firstWriteTempBuffer, secondWriteTempBuffer);
119+
return serialize(resultPacket, expectedLength, totalSize, firstWriteTempBuffer, secondWriteTempBuffer);
106120
} else {
107-
return serialize(packet, expectedLength, totalSize, firstWriteBuffer, secondWriteBuffer);
121+
return serialize(resultPacket, expectedLength, totalSize, firstWriteBuffer, secondWriteBuffer);
108122
}
109123
}
110124

@@ -115,7 +129,7 @@ public void writeNextPacket() {
115129
* @param expectedLength the expected size.
116130
* @return the total size or -1.
117131
*/
118-
protected abstract int getTotalSize(@NotNull W packet, int expectedLength);
132+
protected abstract int getTotalSize(@NotNull WritablePacket packet, int expectedLength);
119133

120134
/**
121135
* Serialize packet to byte buffer.
@@ -280,24 +294,17 @@ protected boolean onAfterWrite(
280294
return buffer;
281295
}
282296

283-
/**
284-
* Handle a completed packet.
285-
*
286-
* @param packet the writable packet.
287-
*/
288-
protected void completed(@NotNull W packet) {
289-
}
290-
291297
/**
292298
* Handle successful wrote data.
293299
*
294300
* @param result the count of wrote bytes.
295301
* @param packet the sent packet.
296302
*/
297-
protected void handleSuccessfulWriting(@NotNull Integer result, @NotNull W packet) {
303+
protected void handleSuccessfulWriting(@NotNull Integer result, @NotNull WritablePacket packet) {
298304
updateActivityFunction.run();
299305

300306
if (result == -1) {
307+
sentPacketHandler.accept(packet, Boolean.FALSE);
301308
connection.close();
302309
return;
303310
}
@@ -323,6 +330,8 @@ protected void handleSuccessfulWriting(@NotNull Integer result, @NotNull W packe
323330
}
324331
}
325332

333+
sentPacketHandler.accept(packet, Boolean.TRUE);
334+
326335
if (isWriting.compareAndSet(true, false)) {
327336
writeNextPacket();
328337
}
@@ -334,7 +343,7 @@ protected void handleSuccessfulWriting(@NotNull Integer result, @NotNull W packe
334343
* @param exception the exception.
335344
* @param packet the packet.
336345
*/
337-
protected void handleFailedWriting(@NotNull Throwable exception, @NotNull W packet) {
346+
protected void handleFailedWriting(@NotNull Throwable exception, @NotNull WritablePacket packet) {
338347
LOGGER.error(new RuntimeException("Failed writing packet: " + packet, exception));
339348

340349
if (!connection.isClosed()) {

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/DefaultPacketWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import java.nio.ByteBuffer;
1010
import java.nio.channels.AsynchronousSocketChannel;
11+
import java.util.function.BiConsumer;
12+
import java.util.function.Consumer;
1113
import java.util.function.Supplier;
1214

1315
/**
@@ -23,21 +25,25 @@ public DefaultPacketWriter(
2325
@NotNull AsynchronousSocketChannel channel,
2426
@NotNull BufferAllocator bufferAllocator,
2527
@NotNull Runnable updateActivityFunction,
26-
@NotNull Supplier<@Nullable W> nextWritePacketSupplier,
28+
@NotNull Supplier<@Nullable WritablePacket> nextWritePacketSupplier,
29+
@NotNull Consumer<@NotNull WritablePacket> writtenPacketHandler,
30+
@NotNull BiConsumer<@NotNull WritablePacket, Boolean> sentPacketHandler,
2731
int packetLengthHeaderSize
2832
) {
2933
super(
3034
connection,
3135
channel,
3236
bufferAllocator,
3337
updateActivityFunction,
34-
nextWritePacketSupplier
38+
nextWritePacketSupplier,
39+
writtenPacketHandler,
40+
sentPacketHandler
3541
);
3642
this.packetLengthHeaderSize = packetLengthHeaderSize;
3743
}
3844

3945
@Override
40-
protected int getTotalSize(@NotNull W packet, int expectedLength) {
46+
protected int getTotalSize(@NotNull WritablePacket packet, int expectedLength) {
4147
return expectedLength + packetLengthHeaderSize;
4248
}
4349

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/IdBasedPacketWriter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import com.ss.rlib.network.BufferAllocator;
44
import com.ss.rlib.network.Connection;
55
import com.ss.rlib.network.packet.IdBasedWritablePacket;
6+
import com.ss.rlib.network.packet.WritablePacket;
67
import org.jetbrains.annotations.NotNull;
78
import org.jetbrains.annotations.Nullable;
89

910
import java.nio.ByteBuffer;
1011
import java.nio.channels.AsynchronousSocketChannel;
12+
import java.util.function.BiConsumer;
13+
import java.util.function.Consumer;
1114
import java.util.function.Supplier;
1215

1316
/**
@@ -23,7 +26,9 @@ public IdBasedPacketWriter(
2326
@NotNull AsynchronousSocketChannel channel,
2427
@NotNull BufferAllocator bufferAllocator,
2528
@NotNull Runnable updateActivityFunction,
26-
@NotNull Supplier<@Nullable W> nextWritePacketSupplier,
29+
@NotNull Supplier<@Nullable WritablePacket> nextWritePacketSupplier,
30+
@NotNull Consumer<@NotNull WritablePacket> writtenPacketHandler,
31+
@NotNull BiConsumer<@NotNull WritablePacket, Boolean> sentPacketHandler,
2732
int packetLengthHeaderSize,
2833
int packetIdHeaderSize
2934
) {
@@ -33,6 +38,8 @@ public IdBasedPacketWriter(
3338
bufferAllocator,
3439
updateActivityFunction,
3540
nextWritePacketSupplier,
41+
writtenPacketHandler,
42+
sentPacketHandler,
3643
packetLengthHeaderSize
3744
);
3845
this.packetIdHeaderSize = packetIdHeaderSize;

0 commit comments

Comments
 (0)