Skip to content

Commit 3b99ee3

Browse files
committed
update network API to support MQTT implementation
1 parent 397ab37 commit 3b99ee3

6 files changed

Lines changed: 101 additions & 51 deletions

File tree

rlib-common/src/main/java/com/ss/rlib/common/concurrent/GroupThreadFactory.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,13 @@ public interface ThreadConstructor {
2020
@NotNull Thread create(@NotNull ThreadGroup group, @NotNull Runnable runnable, @NotNull String name);
2121
}
2222

23-
/**
24-
* The order of the next thread.
25-
*/
2623
private final AtomicInteger ordinal;
27-
28-
/**
29-
* The name of this group.
30-
*/
3124
private final String name;
32-
33-
/**
34-
* The group of threads.
35-
*/
3625
private final ThreadGroup group;
37-
38-
/**
39-
* The thread's constructor.
40-
*/
4126
private final ThreadConstructor constructor;
4227

43-
/**
44-
* The priority of these threads.
45-
*/
4628
private final int priority;
29+
private final boolean daemon;
4730

4831
public GroupThreadFactory(@NotNull String name) {
4932
this(name, Thread::new, Thread.NORM_PRIORITY);
@@ -54,11 +37,21 @@ public GroupThreadFactory(@NotNull String name, int priority) {
5437
}
5538

5639
public GroupThreadFactory(@NotNull String name, @NotNull ThreadConstructor constructor, int priority) {
40+
this(name, constructor, priority, false);
41+
}
42+
43+
public GroupThreadFactory(
44+
@NotNull String name,
45+
@NotNull ThreadConstructor constructor,
46+
int priority,
47+
boolean daemon
48+
) {
5749
this.constructor = constructor;
5850
this.priority = priority;
5951
this.name = name;
6052
this.group = new ThreadGroup(name);
6153
this.ordinal = new AtomicInteger();
54+
this.daemon = daemon;
6255
}
6356

6457
@Deprecated
@@ -67,6 +60,7 @@ public GroupThreadFactory(@NotNull String name, @NotNull Class<? extends Thread>
6760
this.name = name;
6861
this.group = new ThreadGroup(name);
6962
this.ordinal = new AtomicInteger();
63+
this.daemon = false;
7064
this.constructor = new ThreadConstructor() {
7165

7266
Constructor<? extends Thread> constructor =
@@ -87,6 +81,7 @@ public GroupThreadFactory(@NotNull String name, @NotNull Class<? extends Thread>
8781
public @NotNull Thread newThread(@NotNull Runnable runnable) {
8882
var thread = constructor.create(group, runnable, name + "-" + ordinal.incrementAndGet());
8983
thread.setPriority(priority);
84+
thread.setDaemon(daemon);
9085
return thread;
9186
}
9287
}

rlib-network/src/main/java/com/ss/rlib/network/impl/IdBasedPacketConnection.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,14 @@
2020
/**
2121
* @author JavaSaBr
2222
*/
23+
@Getter(AccessLevel.PROTECTED)
2324
public class IdBasedPacketConnection<R extends IdBasedReadablePacket<R>, W extends IdBasedWritablePacket> extends
2425
AbstractConnection<R, W> {
2526

26-
@Getter(AccessLevel.PROTECTED)
2727
private final PacketReader packetReader;
28-
29-
@Getter(AccessLevel.PROTECTED)
3028
private final PacketWriter packetWriter;
31-
3229
private final ReadablePacketRegistry<R> packetRegistry;
30+
3331
private final int packetIdHeaderSize;
3432

3533
public IdBasedPacketConnection(

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/AbstractPacketReader.java

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public void failed(@NotNull Throwable exc, @Nullable Void attachment) {
6464
@Setter(AccessLevel.PROTECTED)
6565
protected volatile ByteBuffer decryptedTempBuffer;
6666

67-
protected final int packetLengthHeaderSize;
6867
protected final int maxPacketsByRead;
6968

7069
protected AbstractPacketReader(
@@ -73,7 +72,6 @@ protected AbstractPacketReader(
7372
@NotNull BufferAllocator bufferAllocator,
7473
@NotNull Runnable updateActivityFunction,
7574
@NotNull Consumer<? super R> readPacketHandler,
76-
int packetLengthHeaderSize,
7775
int maxPacketsByRead
7876
) {
7977
this.connection = connection;
@@ -84,7 +82,6 @@ protected AbstractPacketReader(
8482
this.decryptedBuffer = bufferAllocator.takePendingBuffer();
8583
this.updateActivityFunction = updateActivityFunction;
8684
this.readPacketHandler = readPacketHandler;
87-
this.packetLengthHeaderSize = packetLengthHeaderSize;
8885
this.maxPacketsByRead = maxPacketsByRead;
8986
}
9087

@@ -165,24 +162,25 @@ else if (waitedBytes > 0) {
165162
var readPackets = 0;
166163
var endPosition = 0;
167164

168-
while (bufferToRead.remaining() >= packetLengthHeaderSize && readPackets < maxPacketsByRead) {
165+
while (canStartReadPacket(bufferToRead) && readPackets < maxPacketsByRead) {
169166

170167
// set position of start a next packet
171168
bufferToRead.position(endPosition);
172169

170+
var positionBeforeRead = endPosition;
173171
var packetLength = readPacketLength(bufferToRead);
174-
var dataLength = packetLength - packetLengthHeaderSize;
172+
var dataLength = calcDataLength(packetLength, bufferToRead.position() - endPosition, bufferToRead);
175173

176-
LOGGER.debug(packetLength, bufferToRead.position() - packetLengthHeaderSize,
174+
LOGGER.debug(packetLength, positionBeforeRead,
177175
(length, pos) -> "Find next packet from position: " + pos + " with length: " + length);
178176

179177
// calculate position of end the next packet
180178
endPosition += packetLength;
181179

182180
// if the packet isn't full presented in this buffer
183-
if (endPosition > bufferToRead.limit()) {
181+
if (packetLength == -1 || endPosition > bufferToRead.limit()) {
184182

185-
bufferToRead.position(bufferToRead.position() - packetLengthHeaderSize);
183+
bufferToRead.position(positionBeforeRead);
186184

187185
// if we read the received buffer we need to put
188186
// not read data to the pending buffer or big mapped byte buffer
@@ -247,9 +245,9 @@ else if (packetLength > readTempBuffer.capacity()) {
247245
R packet;
248246

249247
if (decryptedData != null) {
250-
packet = createPacketFor(decryptedData, dataLength);
248+
packet = createPacketFor(decryptedData, packetLength, dataLength);
251249
} else {
252-
packet = createPacketFor(bufferToRead, dataLength);
250+
packet = createPacketFor(bufferToRead, packetLength, dataLength);
253251
}
254252

255253
if (packet != null) {
@@ -284,6 +282,32 @@ else if (packetLength > readTempBuffer.capacity()) {
284282
return readPackets;
285283
}
286284

285+
/**
286+
* Check buffer's data.
287+
*
288+
* @param buffer the buffer to read.
289+
* @return true if this buffer has enough data to start initial reading.
290+
*/
291+
protected abstract boolean canStartReadPacket(@NotNull ByteBuffer buffer);
292+
293+
/**
294+
* Calculate size of packet data.
295+
*
296+
* @param packetLength the full packet length.
297+
* @param readBytes the count of already read bytes from buffer to get packet length.
298+
* @param buffer the buffer.
299+
* @return the length of packet data part.
300+
*/
301+
protected abstract int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer);
302+
303+
/**
304+
* Get the packet's data length of next packet in the buffer.
305+
*
306+
* @param buffer the buffer with received data.
307+
* @return the packet length or -1 if we have no enough data to read length.
308+
*/
309+
protected abstract int readPacketLength(@NotNull ByteBuffer buffer);
310+
287311
protected void reAllocTempBuffers(
288312
@NotNull ByteBuffer sourceBuffer,
289313
int packetLength
@@ -383,16 +407,6 @@ protected int getMaxPacketsByRead() {
383407
return maxPacketsByRead;
384408
}
385409

386-
/**
387-
* Get the packet's data length of next packet in the buffer.
388-
*
389-
* @param buffer the buffer with received data.
390-
* @return the packet length.
391-
*/
392-
protected int readPacketLength(@NotNull ByteBuffer buffer) {
393-
return readHeader(buffer, packetLengthHeaderSize);
394-
}
395-
396410
protected int readHeader(@NotNull ByteBuffer buffer, int headerSize) {
397411
switch (headerSize) {
398412
case 1:
@@ -409,11 +423,12 @@ protected int readHeader(@NotNull ByteBuffer buffer, int headerSize) {
409423
/**
410424
* Create a packet to read received data.
411425
*
412-
* @param buffer the buffer with received data.
413-
* @param length length of packet's data.
426+
* @param buffer the buffer with received data.
427+
* @param packetLength the length of packet.
428+
* @param dataLength length of packet's data.
414429
* @return the readable packet.
415430
*/
416-
protected abstract @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int length);
431+
protected abstract @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int packetLength, int dataLength);
417432

418433
@Override
419434
public void close() {

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/DefaultPacketReader.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class DefaultPacketReader<R extends ReadablePacket, C extends Connection<
2020
AbstractPacketReader<R, C> {
2121

2222
private final IntFunction<R> readPacketFactory;
23+
private final int packetLengthHeaderSize;
2324

2425
public DefaultPacketReader(
2526
@NotNull C connection,
@@ -37,14 +38,29 @@ public DefaultPacketReader(
3738
bufferAllocator,
3839
updateActivityFunction,
3940
readPacketHandler,
40-
packetLengthHeaderSize,
4141
maxPacketsByRead
4242
);
4343
this.readPacketFactory = readPacketFactory;
44+
this.packetLengthHeaderSize = packetLengthHeaderSize;
4445
}
4546

4647
@Override
47-
protected @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int length) {
48-
return readPacketFactory.apply(length);
48+
protected boolean canStartReadPacket(@NotNull ByteBuffer buffer) {
49+
return buffer.remaining() >= packetLengthHeaderSize;
50+
}
51+
52+
@Override
53+
protected int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer) {
54+
return packetLength - packetLengthHeaderSize;
55+
}
56+
57+
@Override
58+
protected int readPacketLength(@NotNull ByteBuffer buffer) {
59+
return readHeader(buffer, packetLengthHeaderSize);
60+
}
61+
62+
@Override
63+
protected @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int packetLength, int dataLength) {
64+
return readPacketFactory.apply(dataLength);
4965
}
5066
}

rlib-network/src/main/java/com/ss/rlib/network/packet/impl/IdBasedPacketReader.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class IdBasedPacketReader<R extends IdBasedReadablePacket<R>, C extends C
2020
AbstractPacketReader<R, C> {
2121

2222
private final ReadablePacketRegistry<R> packetRegistry;
23+
private final int packetLengthHeaderSize;
2324
private final int packetIdHeaderSize;
2425

2526
public IdBasedPacketReader(
@@ -39,15 +40,30 @@ public IdBasedPacketReader(
3940
bufferAllocator,
4041
updateActivityFunction,
4142
readPacketHandler,
42-
packetLengthHeaderSize,
4343
maxPacketsByRead
4444
);
45+
this.packetLengthHeaderSize = packetLengthHeaderSize;
4546
this.packetIdHeaderSize = packetIdHeaderSize;
4647
this.packetRegistry = packetRegistry;
4748
}
4849

4950
@Override
50-
protected @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int length) {
51+
protected boolean canStartReadPacket(@NotNull ByteBuffer buffer) {
52+
return buffer.remaining() > packetLengthHeaderSize;
53+
}
54+
55+
@Override
56+
protected int calcDataLength(int packetLength, int readBytes, @NotNull ByteBuffer buffer) {
57+
return packetLength - packetLengthHeaderSize;
58+
}
59+
60+
@Override
61+
protected int readPacketLength(@NotNull ByteBuffer buffer) {
62+
return readHeader(buffer, packetLengthHeaderSize);
63+
}
64+
65+
@Override
66+
protected @Nullable R createPacketFor(@NotNull ByteBuffer buffer, int packetLength, int dataLength) {
5167
return packetRegistry.findById(readHeader(buffer, packetIdHeaderSize)).newInstance();
5268
}
5369
}

rlib-network/src/main/java/com/ss/rlib/network/server/impl/DefaultServerNetwork.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public DefaultServerNetwork(
7474
var threadFactory = new GroupThreadFactory(
7575
config.getGroupName(),
7676
config.getThreadConstructor(),
77-
config.getThreadPriority()
77+
config.getThreadPriority(),
78+
true
7879
);
7980

8081
var executor = config.getGroupSize() < config.getGroupMaxSize() ? new ThreadPoolExecutor(
@@ -87,6 +88,9 @@ public DefaultServerNetwork(
8788
new ThreadPoolExecutor.CallerRunsPolicy()
8889
) : Executors.newFixedThreadPool(config.getGroupSize(), threadFactory);
8990

91+
// activate the executor
92+
executor.submit(() -> {});
93+
9094
LOGGER.info("Executor configuration:");
9195
LOGGER.info(config, conf -> "Min threads: " + conf.getGroupSize());
9296
LOGGER.info(config, conf -> "Max threads: " + conf.getGroupMaxSize());
@@ -123,7 +127,13 @@ public DefaultServerNetwork(
123127
@Override
124128
public <S extends ServerNetwork<C>> @NotNull S start(@NotNull InetSocketAddress serverAddress) {
125129
Utils.unchecked(channel, serverAddress, AsynchronousServerSocketChannel::bind);
130+
126131
LOGGER.info(serverAddress, adr -> "Started server on address: " + adr);
132+
133+
if (!subscribers.isEmpty()) {
134+
acceptNext();
135+
}
136+
127137
return ClassUtils.unsafeCast(this);
128138
}
129139

0 commit comments

Comments
 (0)