Skip to content

Commit fe40ead

Browse files
committed
fix some new bugs with SSL connections
1 parent ee0c84e commit fe40ead

11 files changed

Lines changed: 220 additions & 53 deletions

File tree

rlib-network/src/main/java/com/ss/rlib/network/client/impl/DefaultClientNetwork.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static com.ss.rlib.common.util.Utils.uncheckedGet;
55
import com.ss.rlib.common.concurrent.util.ThreadUtils;
66
import com.ss.rlib.common.util.AsyncUtils;
7-
import com.ss.rlib.common.util.Utils;
87
import com.ss.rlib.logger.api.Logger;
98
import com.ss.rlib.logger.api.LoggerManager;
109
import com.ss.rlib.network.Connection;
@@ -85,7 +84,7 @@ public DefaultClientNetwork(
8584

8685
@Override
8786
public void completed(@Nullable Void result, @Nullable Void attachment) {
88-
LOGGER.info(channel, ch -> "Connected to server: " + NetworkUtils.getSocketAddress(ch));
87+
LOGGER.info(channel, ch -> "Connected to server: " + NetworkUtils.getRemoteAddress(ch));
8988
asyncResult.complete(channelToConnection.apply(DefaultClientNetwork.this, channel));
9089
}
9190

rlib-network/src/main/java/com/ss/rlib/network/impl/AbstractConnection.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.ss.rlib.network.impl;
22

33
import static com.ss.rlib.common.util.Utils.unchecked;
4-
import static com.ss.rlib.network.util.NetworkUtils.getSocketAddress;
54
import com.ss.rlib.common.function.NotNullBiConsumer;
65
import com.ss.rlib.common.util.array.Array;
76
import com.ss.rlib.common.util.array.ArrayFactory;
@@ -82,7 +81,7 @@ public AbstractConnection(
8281
this.isWriting = new AtomicBoolean(false);
8382
this.closed = new AtomicBoolean(false);
8483
this.subscribers = ArrayFactory.newCopyOnModifyArray(NotNullBiConsumer.class);
85-
this.remoteAddress = String.valueOf(NetworkUtils.getSocketAddress(channel));
84+
this.remoteAddress = String.valueOf(NetworkUtils.getRemoteAddress(channel));
8685
}
8786

8887
@Override
@@ -92,8 +91,12 @@ public void onConnected() {}
9291

9392
protected abstract @NotNull PacketWriter getPacketWriter();
9493

95-
protected void handleReadPacket(@NotNull R packet) {
96-
LOGGER.debug(channel, packet, (ch, pck) -> "Handle read packet: " + pck + " from: " + getSocketAddress(ch));
94+
protected void handleReceivedPacket(@NotNull R packet) {
95+
LOGGER.debug(
96+
channel,
97+
packet,
98+
(ch, pck) -> "Handle received packet: " + pck + " from: " + NetworkUtils.getRemoteAddress(ch)
99+
);
97100
subscribers.forEachR(this, packet, BiConsumer::accept);
98101
}
99102

rlib-network/src/main/java/com/ss/rlib/network/impl/AbstractSSLConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,10 @@ public AbstractSSLConnection(
3434
throw new RuntimeException(e);
3535
}
3636
}
37+
38+
@Override
39+
protected void sendImpl(@NotNull WritablePacket packet) {
40+
super.sendImpl(packet);
41+
getPacketReader().startRead();
42+
}
3743
}

rlib-network/src/main/java/com/ss/rlib/network/impl/DefaultDataConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public DefaultDataConnection(
4646
channel,
4747
bufferAllocator,
4848
this::updateLastActivity,
49-
this::handleReadPacket,
49+
this::handleReceivedPacket,
5050
value -> createReadablePacket(),
5151
packetLengthHeaderSize,
5252
maxPacketsByRead

rlib-network/src/main/java/com/ss/rlib/network/impl/DefaultDataSSLConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public DefaultDataSSLConnection(
4949
channel,
5050
bufferAllocator,
5151
this::updateLastActivity,
52-
this::handleReadPacket,
52+
this::handleReceivedPacket,
5353
value -> createReadablePacket(),
5454
sslEngine,
5555
this::sendImpl,

rlib-network/src/main/java/com/ss/rlib/network/impl/IdBasedPacketConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public IdBasedPacketConnection(
5353
channel,
5454
bufferAllocator,
5555
this::updateLastActivity,
56-
this::handleReadPacket,
56+
this::handleReceivedPacket,
5757
packetLengthHeaderSize,
5858
maxPacketsByRead,
5959
packetIdHeaderSize,

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/AbstractPacketReader.java

Lines changed: 77 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package com.ss.rlib.network.packet.impl;
22

33
import static com.ss.rlib.common.util.ObjectUtils.notNull;
4-
import static com.ss.rlib.network.util.NetworkUtils.getSocketAddress;
5-
import static java.util.Objects.requireNonNull;
4+
import static com.ss.rlib.network.util.NetworkUtils.getRemoteAddress;
65
import com.ss.rlib.common.function.NotNullConsumer;
76
import com.ss.rlib.common.util.BufferUtils;
8-
import com.ss.rlib.common.util.ObjectUtils;
97
import com.ss.rlib.logger.api.Logger;
108
import com.ss.rlib.logger.api.LoggerManager;
119
import com.ss.rlib.network.BufferAllocator;
1210
import com.ss.rlib.network.Connection;
1311
import com.ss.rlib.network.packet.PacketReader;
1412
import com.ss.rlib.network.packet.ReadablePacket;
13+
import com.ss.rlib.network.util.NetworkUtils;
1514
import lombok.AccessLevel;
1615
import lombok.Getter;
1716
import lombok.Setter;
@@ -22,7 +21,6 @@
2221
import java.nio.channels.AsynchronousCloseException;
2322
import java.nio.channels.AsynchronousSocketChannel;
2423
import java.nio.channels.CompletionHandler;
25-
import java.util.Objects;
2624
import java.util.concurrent.atomic.AtomicBoolean;
2725
import java.util.function.Consumer;
2826

@@ -91,9 +89,14 @@ protected AbstractPacketReader(
9189

9290
@Override
9391
public void startRead() {
94-
if (isReading.compareAndSet(false, true)) {
95-
channel.read(getBufferToReadFromChannel(), null, readHandler);
92+
93+
if (!isReading.compareAndSet(false, true)) {
94+
return;
9695
}
96+
97+
LOGGER.debug(channel, ch -> "Start waiting for new data from channel \"" + getRemoteAddress(ch) + "\"");
98+
99+
channel.read(getBufferToReadFromChannel(), null, readHandler);
97100
}
98101

99102
/**
@@ -129,8 +132,11 @@ protected int readPackets(@NotNull ByteBuffer receivedBuffer, @NotNull ByteBuffe
129132
tempPendingBuffer = notNull(getTempPendingBuffer());
130133
}
131134

132-
LOGGER.debugNullable(receivedBuffer, tempPendingBuffer,
133-
(buf, mappedBuf) -> "Put received buffer: " + buf + " to read mapped buffer: " + mappedBuf);
135+
LOGGER.debugNullable(
136+
receivedBuffer,
137+
tempPendingBuffer,
138+
(buf, mappedBuf) -> "Put received buffer: " + buf + " to read mapped buffer: " + mappedBuf
139+
);
134140

135141
bufferToRead = BufferUtils.putToAndFlip(tempPendingBuffer, receivedBuffer);
136142
}
@@ -140,9 +146,12 @@ else if (waitedBytes > 0) {
140146

141147
if (pendingBuffer.remaining() < receivedBuffer.remaining()) {
142148

143-
LOGGER.debug(pendingBuffer, receivedBuffer,
149+
LOGGER.debug(
150+
pendingBuffer,
151+
receivedBuffer,
144152
(penBuf, buf) -> "Pending buffer: " + penBuf + " is too small to append received buffer: " +
145-
buf + ", allocate mapped buffer for this");
153+
buf + ", allocate mapped buffer for this"
154+
);
146155

147156
allocTempBuffers(pendingBuffer.flip(), pendingBuffer.capacity());
148157

@@ -152,15 +161,21 @@ else if (waitedBytes > 0) {
152161

153162
tempPendingBuffer = notNull(getTempPendingBuffer());
154163

155-
LOGGER.debugNullable(receivedBuffer, tempPendingBuffer,
156-
(buf, mappedBuf) -> "Put received buffer: " + buf + " to mapped buffer: " + mappedBuf);
164+
LOGGER.debugNullable(
165+
receivedBuffer,
166+
tempPendingBuffer,
167+
(buf, mappedBuf) -> "Put received buffer: " + buf + " to mapped buffer: " + mappedBuf
168+
);
157169

158170
bufferToRead = BufferUtils.putToAndFlip(tempPendingBuffer, receivedBuffer);
159171

160172
} else {
161173

162-
LOGGER.debug(receivedBuffer, pendingBuffer,
163-
(buf, penBuf) -> "Put received buffer: " + buf + " to pending buffer: " + penBuf);
174+
LOGGER.debug(
175+
receivedBuffer,
176+
pendingBuffer,
177+
(buf, penBuf) -> "Put received buffer: " + buf + " to pending buffer: " + penBuf
178+
);
164179

165180
bufferToRead = BufferUtils.putToAndFlip(pendingBuffer, receivedBuffer);
166181
}
@@ -180,8 +195,11 @@ else if (waitedBytes > 0) {
180195
var packetLength = readPacketLength(bufferToRead);
181196
var dataLength = getDataLength(packetLength, bufferToRead.position() - endPosition, bufferToRead);
182197

183-
LOGGER.debug(packetLength, positionBeforeRead,
184-
(length, pos) -> "Find next packet from position: " + pos + " with length: " + length);
198+
LOGGER.debug(
199+
packetLength,
200+
positionBeforeRead,
201+
(length, pos) -> "Find next packet from position: " + pos + " with length: " + length
202+
);
185203

186204
// calculate position of end the next packet
187205
endPosition += packetLength;
@@ -196,8 +214,10 @@ else if (waitedBytes > 0) {
196214
if (bufferToRead == receivedBuffer) {
197215
if (packetLength <= pendingBuffer.capacity()) {
198216
pendingBuffer.put(receivedBuffer);
199-
LOGGER.debug(pendingBuffer,
200-
buf -> "Put pending data form received buffer to pending buffer: " + buf);
217+
LOGGER.debug(
218+
pendingBuffer,
219+
buf -> "Put pending data form received buffer to pending buffer: " + buf
220+
);
201221
} else {
202222
allocTempBuffers(receivedBuffer, packetLength);
203223
}
@@ -219,11 +239,14 @@ else if (bufferToRead == pendingBuffer) {
219239
// if not read data is less than pending buffer then we can switch to use the pending buffer
220240
if (Math.max(packetLength, tempPendingBuffer.remaining()) <= pendingBuffer.capacity()) {
221241

222-
pendingBuffer.clear()
242+
pendingBuffer
243+
.clear()
223244
.put(tempPendingBuffer);
224245

225-
LOGGER.debug(pendingBuffer,
226-
buf -> "Moved pending data from mapped buffer to pending buffer: " + buf);
246+
LOGGER.debug(
247+
pendingBuffer,
248+
buf -> "Moved pending data from mapped buffer to pending buffer: " + buf
249+
);
227250

228251
freeTempBuffers();
229252
}
@@ -238,9 +261,12 @@ else if (packetLength > tempPendingBuffer.capacity()) {
238261
}
239262
}
240263

241-
LOGGER.debug(channel, readPackets,
242-
(ch, count) -> "Read " + count + " packet(s) from buffered data of " + getSocketAddress(ch) + ", " +
243-
"but 1 packet is still waiting for receiving additional data.");
264+
LOGGER.debug(
265+
channel,
266+
readPackets,
267+
(ch, count) -> "Read " + count + " packet(s) from buffered data of " + getRemoteAddress(ch) + ", " +
268+
"but 1 packet is still waiting for receiving additional data."
269+
);
244270

245271
return readPackets;
246272
}
@@ -276,7 +302,7 @@ else if (packetLength > tempPendingBuffer.capacity()) {
276302
}
277303

278304
LOGGER.debug(channel, readPackets,
279-
(ch, count) -> "Read " + count + " packet(s) from buffered data of " + getSocketAddress(ch) + ".");
305+
(ch, count) -> "Read " + count + " packet(s) from buffered data of " + getRemoteAddress(ch) + ".");
280306

281307
return readPackets;
282308
}
@@ -314,13 +340,19 @@ protected void reAllocTempBuffers(
314340
int packetLength
315341
) {
316342

317-
LOGGER.debug(sourceBuffer.capacity(), packetLength, (currentSize, newSize) ->
318-
"Resize read temp buffer from: " + currentSize + " to: " + newSize);
343+
LOGGER.debug(
344+
sourceBuffer.capacity(),
345+
packetLength,
346+
(currentSize, newSize) -> "Resize read temp buffer from " + currentSize + " to " + newSize
347+
);
319348

320349
var newReadTempBuffer = bufferAllocator.takeBuffer(packetLength + readBuffer.capacity());
321350

322-
LOGGER.debug(sourceBuffer, newReadTempBuffer,
323-
(old, buf) -> "Moved pending data from old temp buffer: " + old + " to new temp buffer: " + buf);
351+
LOGGER.debug(
352+
sourceBuffer,
353+
newReadTempBuffer,
354+
(old, buf) -> "Moved pending data from old temp buffer " + old + " to new temp buffer " + buf
355+
);
324356

325357
newReadTempBuffer.put(sourceBuffer);
326358

@@ -331,13 +363,19 @@ protected void reAllocTempBuffers(
331363

332364
protected void allocTempBuffers(@NotNull ByteBuffer sourceBuffer, int packetLength) {
333365

334-
LOGGER.debug(packetLength, sourceBuffer.remaining(), (length, part) ->
335-
"Request temp buffer to store a part: " + part + " of big packet with length: " + length);
366+
LOGGER.debug(
367+
packetLength,
368+
sourceBuffer.remaining(),
369+
(length, part) -> "Request temp buffer to store a part: " + part + " of big packet with length: " + length
370+
);
336371

337372
var readTempBuffer = bufferAllocator.takeBuffer(packetLength + readBuffer.capacity());
338373

339-
LOGGER.debug(sourceBuffer, readTempBuffer,
340-
(recBuf, buf) -> "Put the part of packet: " + recBuf + " to mapped buffer: " + buf);
374+
LOGGER.debug(
375+
sourceBuffer,
376+
readTempBuffer,
377+
(recBuf, buf) -> "Put the part of packet: " + recBuf + " to mapped buffer: " + buf
378+
);
341379

342380
readTempBuffer.put(sourceBuffer);
343381

@@ -367,6 +405,12 @@ protected void handleReadData(@NotNull Integer result) {
367405
return;
368406
}
369407

408+
LOGGER.debug(
409+
result,
410+
channel,
411+
(bytes, ch) -> "Received " + bytes + " bytes from channel \"" + NetworkUtils.getRemoteAddress(ch) + "\""
412+
);
413+
370414
var readBuffer = getBufferToReadFromChannel();
371415
readBuffer.flip();
372416
try {

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/AbstractPacketWriter.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.ss.rlib.network.Connection;
1111
import com.ss.rlib.network.packet.PacketWriter;
1212
import com.ss.rlib.network.packet.WritablePacket;
13+
import com.ss.rlib.network.util.NetworkUtils;
1314
import lombok.RequiredArgsConstructor;
1415
import org.jetbrains.annotations.NotNull;
1516
import org.jetbrains.annotations.Nullable;
@@ -100,7 +101,7 @@ public void writeNextPacket() {
100101

101102
LOGGER.debug(channel,
102103
resultBuffer,
103-
(ch, buf) -> "Write to channel " + getSocketAddress(ch) + " byte buffer:\n" + hexDump(buf)
104+
(ch, buf) -> "Write to channel \"" + getRemoteAddress(ch) + "\" data:\n" + hexDump(buf)
104105
);
105106

106107
channel.write(resultBuffer, waitPacket, writeHandler);
@@ -305,24 +306,27 @@ protected void handleSuccessfulWriting(@NotNull Integer result, @NotNull Writabl
305306
return;
306307
}
307308

308-
var writeTempBuffer = this.firstWriteTempBuffer;
309309
var writingBuffer = this.writingBuffer;
310310

311311
if (writingBuffer.remaining() > 0) {
312312
LOGGER.debug(
313-
writeTempBuffer,
314-
buf -> "Buffer was not consumed fully, try to write not consumed bytes again: " + buf.remaining()
313+
writingBuffer,
314+
channel,
315+
(buf, ch) -> "Buffer was not consumed fully, " +
316+
"try to write else " + buf.remaining() + " bytes to channel " + NetworkUtils.getRemoteAddress(ch)
315317
);
316318
channel.write(writingBuffer, packet, writeHandler);
317319
return;
320+
} else {
321+
LOGGER.debug(result, bytes -> "Done writing " + bytes + " bytes");
318322
}
319323

320324
sentPacketHandler.accept(packet, Boolean.TRUE);
321325

322326
if (isWriting.compareAndSet(true, false)) {
323327

324328
// if we have temp buffers, we can remove it after finishing writing a packet
325-
if (writeTempBuffer != null) {
329+
if (firstWriteTempBuffer != null) {
326330
clearTempBuffers();
327331
}
328332

rlib-network/src/main/java/com/ss/rlib/network/util/NetworkUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void checkClientTrusted(@NotNull X509Certificate[] certificates, @NotNull
3838
public void checkServerTrusted(@NotNull X509Certificate[] certificates, @NotNull String arg1) { }
3939
}
4040

41-
public static @NotNull SocketAddress getSocketAddress(@NotNull AsynchronousSocketChannel socketChannel) {
41+
public static @NotNull SocketAddress getRemoteAddress(@NotNull AsynchronousSocketChannel socketChannel) {
4242
return Utils.uncheckedGet(socketChannel, AsynchronousSocketChannel::getRemoteAddress);
4343
}
4444

0 commit comments

Comments
 (0)