Skip to content

Commit 90cd907

Browse files
committed
continue updating network API to support MQTT implementation
1 parent 3b99ee3 commit 90cd907

6 files changed

Lines changed: 40 additions & 29 deletions

File tree

rlib-network/src/main/java/com/ss/rlib/network/impl/AbstractConnection.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public abstract class AbstractConnection<R extends ReadablePacket, W extends Wri
5555
protected final Array<BiConsumer<? super Connection<R, W>, ? super R>> subscribers;
5656

5757
protected final int maxPacketsByRead;
58-
protected final int packetLengthHeaderSize;
5958

6059
@Getter
6160
protected volatile long lastActivity;
@@ -65,12 +64,10 @@ public AbstractConnection(
6564
@NotNull AsynchronousSocketChannel channel,
6665
@NotNull NetworkCryptor crypt,
6766
@NotNull BufferAllocator bufferAllocator,
68-
int maxPacketsByRead,
69-
int packetLengthHeaderSize
67+
int maxPacketsByRead
7068
) {
7169
this.bufferAllocator = bufferAllocator;
7270
this.maxPacketsByRead = maxPacketsByRead;
73-
this.packetLengthHeaderSize = packetLengthHeaderSize;
7471
this.lock = new StampedLock();
7572
this.crypt = crypt;
7673
this.channel = channel;

rlib-network/src/main/java/com/ss/rlib/network/impl/DefaultDataConnection.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
/**
2020
* @author JavaSaBr
2121
*/
22+
@Getter(AccessLevel.PROTECTED)
2223
public abstract class DefaultDataConnection<R extends ReadablePacket, W extends WritablePacket> extends
2324
AbstractConnection<R, W> {
2425

25-
@Getter(AccessLevel.PROTECTED)
2626
private final PacketReader packetReader;
27-
28-
@Getter(AccessLevel.PROTECTED)
2927
private final PacketWriter packetWriter;
3028

29+
private final int packetLengthHeaderSize;
30+
3131
public DefaultDataConnection(
3232
@NotNull Network<? extends Connection<R, W>> network,
3333
@NotNull AsynchronousSocketChannel channel,
@@ -36,7 +36,8 @@ public DefaultDataConnection(
3636
int maxPacketsByRead,
3737
int packetLengthHeaderSize
3838
) {
39-
super(network, channel, crypt, bufferAllocator, maxPacketsByRead, packetLengthHeaderSize);
39+
super(network, channel, crypt, bufferAllocator, maxPacketsByRead);
40+
this.packetLengthHeaderSize = packetLengthHeaderSize;
4041
this.packetReader = createPacketReader();
4142
this.packetWriter = createPacketWriter();
4243
}

rlib-network/src/main/java/com/ss/rlib/network/impl/IdBasedPacketConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class IdBasedPacketConnection<R extends IdBasedReadablePacket<R>, W exten
2828
private final PacketWriter packetWriter;
2929
private final ReadablePacketRegistry<R> packetRegistry;
3030

31+
private final int packetLengthHeaderSize;
3132
private final int packetIdHeaderSize;
3233

3334
public IdBasedPacketConnection(
@@ -40,8 +41,9 @@ public IdBasedPacketConnection(
4041
int packetLengthHeaderSize,
4142
int packetIdHeaderSize
4243
) {
43-
super(network, channel, crypt, bufferAllocator, maxPacketsByRead, packetLengthHeaderSize);
44+
super(network, channel, crypt, bufferAllocator, maxPacketsByRead);
4445
this.packetRegistry = packetRegistry;
46+
this.packetLengthHeaderSize = packetLengthHeaderSize;
4547
this.packetIdHeaderSize = packetIdHeaderSize;
4648
this.packetReader = createPacketReader();
4749
this.packetWriter = createPacketWriter();

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/AbstractPacketReader.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,9 @@ else if (packetLength > readTempBuffer.capacity()) {
245245
R packet;
246246

247247
if (decryptedData != null) {
248-
packet = createPacketFor(decryptedData, packetLength, dataLength);
248+
packet = createPacketFor(decryptedData, positionBeforeRead, packetLength, dataLength);
249249
} else {
250-
packet = createPacketFor(bufferToRead, packetLength, dataLength);
250+
packet = createPacketFor(bufferToRead, positionBeforeRead, packetLength, dataLength);
251251
}
252252

253253
if (packet != null) {
@@ -259,6 +259,8 @@ else if (packetLength > readTempBuffer.capacity()) {
259259
} else {
260260
LOGGER.warning("Cannot create any instance of packet to read data.");
261261
}
262+
263+
bufferToRead.position(endPosition);
262264
}
263265

264266
if (bufferToRead.hasRemaining()) {
@@ -298,7 +300,9 @@ else if (packetLength > readTempBuffer.capacity()) {
298300
* @param buffer the buffer.
299301
* @return the length of packet data part.
300302
*/
301-
protected abstract int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer);
303+
protected int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer) {
304+
return packetLength - readBytes;
305+
}
302306

303307
/**
304308
* Get the packet's data length of next packet in the buffer.
@@ -423,12 +427,18 @@ protected int readHeader(@NotNull ByteBuffer buffer, int headerSize) {
423427
/**
424428
* Create a packet to read received data.
425429
*
426-
* @param buffer the buffer with received data.
427-
* @param packetLength the length of packet.
428-
* @param dataLength length of packet's data.
430+
* @param buffer the buffer with received data.
431+
* @param startPacketPosition the start position of the packet in the buffer.
432+
* @param packetLength the length of packet.
433+
* @param dataLength length of packet's data.
429434
* @return the readable packet.
430435
*/
431-
protected abstract @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int packetLength, int dataLength);
436+
protected abstract @Nullable R createPacketFor(
437+
@NotNull ByteBuffer buffer,
438+
int startPacketPosition,
439+
int packetLength,
440+
int dataLength
441+
);
432442

433443
@Override
434444
public void close() {

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/DefaultPacketReader.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,18 @@ protected boolean canStartReadPacket(@NotNull ByteBuffer buffer) {
4949
return buffer.remaining() >= packetLengthHeaderSize;
5050
}
5151

52-
@Override
53-
protected int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer) {
54-
return packetLength - packetLengthHeaderSize;
55-
}
56-
5752
@Override
5853
protected int readPacketLength(@NotNull ByteBuffer buffer) {
5954
return readHeader(buffer, packetLengthHeaderSize);
6055
}
6156

6257
@Override
63-
protected @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int packetLength, int dataLength) {
58+
protected @Nullable R createPacketFor(
59+
@NotNull ByteBuffer buffer,
60+
int startPacketPosition,
61+
int packetLength,
62+
int dataLength
63+
) {
6464
return readPacketFactory.apply(dataLength);
6565
}
6666
}

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/IdBasedPacketReader.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,19 @@ protected boolean canStartReadPacket(@NotNull ByteBuffer buffer) {
5252
return buffer.remaining() > packetLengthHeaderSize;
5353
}
5454

55-
@Override
56-
protected int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer) {
57-
return packetLength - packetLengthHeaderSize;
58-
}
59-
6055
@Override
6156
protected int readPacketLength(@NotNull ByteBuffer buffer) {
6257
return readHeader(buffer, packetLengthHeaderSize);
6358
}
6459

6560
@Override
66-
protected @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int packetLength, int dataLength) {
67-
return packetRegistry.findById(readHeader(buffer, packetIdHeaderSize)).newInstance();
61+
protected @Nullable R createPacketFor(
62+
@NotNull ByteBuffer buffer,
63+
int startPacketPosition,
64+
int packetLength,
65+
int dataLength
66+
) {
67+
return packetRegistry.findById(readHeader(buffer, packetIdHeaderSize))
68+
.newInstance();
6869
}
6970
}

0 commit comments

Comments
 (0)