11package com .ss .rlib .network .test ;
22
3- import static com .ss .rlib .network .NetworkFactory .newStringDataClientNetwork ;
4- import static com .ss .rlib .network .NetworkFactory .newStringDataServerNetwork ;
3+ import static com .ss .rlib .network .NetworkFactory .*;
54import static com .ss .rlib .network .ServerNetworkConfig .DEFAULT_SERVER ;
65import static java .util .stream .Collectors .toList ;
76import com .ss .rlib .common .concurrent .atomic .AtomicInteger ;
1615import com .ss .rlib .network .annotation .PacketDescription ;
1716import com .ss .rlib .network .client .ClientNetwork ;
1817import com .ss .rlib .network .impl .DefaultBufferAllocator ;
18+ import com .ss .rlib .network .impl .DefaultConnection ;
1919import com .ss .rlib .network .impl .ReuseBufferAllocator ;
20+ import com .ss .rlib .network .packet .impl .DefaultReadablePacket ;
2021import com .ss .rlib .network .packet .impl .DefaultWritablePacket ;
2122import com .ss .rlib .network .packet .impl .StringWritablePacket ;
23+ import com .ss .rlib .network .packet .registry .ReadablePacketRegistry ;
24+ import lombok .Getter ;
2225import lombok .RequiredArgsConstructor ;
2326import lombok .SneakyThrows ;
27+ import lombok .ToString ;
2428import org .jetbrains .annotations .NotNull ;
2529import org .junit .jupiter .api .Assertions ;
2630import org .junit .jupiter .api .Test ;
2731import reactor .core .publisher .Flux ;
2832
2933import java .nio .ByteBuffer ;
30- import java .time .Duration ;
34+ import java .time .* ;
3135import java .util .Objects ;
3236import java .util .concurrent .CountDownLatch ;
3337import java .util .concurrent .ThreadLocalRandom ;
@@ -48,7 +52,7 @@ interface ClientPackets {
4852
4953 @ RequiredArgsConstructor
5054 @ PacketDescription (id = 1 )
51- class GetEchoMessage extends DefaultWritablePacket {
55+ class RequestEchoMessage extends DefaultWritablePacket {
5256
5357 private final String message ;
5458
@@ -60,48 +64,125 @@ protected void writeImpl(@NotNull ByteBuffer buffer) {
6064 }
6165
6266 @ PacketDescription (id = 2 )
63- class GetActiveConnections extends DefaultWritablePacket {
67+ class RequestServerTime extends DefaultWritablePacket {
6468 }
6569
70+ @ ToString
6671 @ RequiredArgsConstructor
6772 @ PacketDescription (id = 3 )
68- class BroadcastMessage extends DefaultWritablePacket {
73+ class ResponseEchoMessage extends DefaultReadablePacket {
74+
75+ @ Getter
76+ private volatile String message ;
77+
78+ @ Override
79+ protected void readImpl (@ NotNull DefaultConnection connection , @ NotNull ByteBuffer buffer ) {
80+ super .readImpl (connection , buffer );
81+ message = readString (buffer );
82+ }
83+ }
84+
85+ @ ToString
86+ @ PacketDescription (id = 4 )
87+ class ResponseServerTime extends DefaultReadablePacket {
88+
89+ @ Getter
90+ private volatile LocalDateTime localDateTime ;
91+
92+ @ Override
93+ protected void readImpl (@ NotNull DefaultConnection connection , @ NotNull ByteBuffer buffer ) {
94+ super .readImpl (connection , buffer );
95+ localDateTime = LocalDateTime .ofEpochSecond (
96+ readLong (buffer ),
97+ 0 ,
98+ ZoneOffset .ofTotalSeconds (readInt (buffer ))
99+ );
100+ }
101+ }
102+ }
103+
104+ // server packets
105+ interface ServerPackets {
106+
107+ @ ToString
108+ @ PacketDescription (id = 1 )
109+ class RequestEchoMessage extends DefaultReadablePacket {
110+
111+ @ Override
112+ protected void readImpl (@ NotNull DefaultConnection connection , @ NotNull ByteBuffer buffer ) {
113+ super .readImpl (connection , buffer );
114+ connection .send (new ResponseEchoMessage (readString (buffer )));
115+ }
116+ }
117+
118+ @ ToString
119+ @ PacketDescription (id = 2 )
120+ class RequestServerTime extends DefaultReadablePacket {
121+
122+ @ Override
123+ protected void readImpl (@ NotNull DefaultConnection connection , @ NotNull ByteBuffer buffer ) {
124+ super .readImpl (connection , buffer );
125+ connection .send (new ResponseServerTime ());
126+ }
127+ }
128+
129+ @ RequiredArgsConstructor
130+ @ PacketDescription (id = 3 )
131+ class ResponseEchoMessage extends DefaultWritablePacket {
69132
70133 private final String message ;
71134
72135 @ Override
73136 protected void writeImpl (@ NotNull ByteBuffer buffer ) {
74137 super .writeImpl (buffer );
75- writeString (buffer , message );
138+ writeString (buffer , "Echo: " + message );
76139 }
77140 }
78- }
79141
80- // server packets
142+ @ PacketDescription (id = 4 )
143+ class ResponseServerTime extends DefaultWritablePacket {
144+
145+ @ Override
146+ protected void writeImpl (@ NotNull ByteBuffer buffer ) {
147+ super .writeImpl (buffer );
148+ var dateTime = ZonedDateTime .now ();
149+ writeLong (buffer , dateTime .toEpochSecond ());
150+ writeInt (buffer , dateTime .getOffset ().getTotalSeconds ());
151+ }
152+ }
153+ }
81154
82155 @ Test
83156 @ SneakyThrows
84157 void echoNetworkTest () {
85158
86- var serverNetwork = newStringDataServerNetwork ();
159+ var serverNetwork = newDefaultServerNetwork (ReadablePacketRegistry .of (DefaultReadablePacket .class ,
160+ ServerPackets .RequestEchoMessage .class ,
161+ ServerPackets .RequestServerTime .class
162+ ));
87163 var serverAddress = serverNetwork .start ();
88164 var counter = new CountDownLatch (90 );
89165
90166 serverNetwork .accepted ()
91167 .flatMap (Connection ::receivedEvents )
92- .subscribe (event -> {
93- var message = event .packet .getData ();
94- LOGGER .info ("Received from client: " + message );
95- event .connection .send (new StringWritablePacket ("Echo: " + message ));
96- });
168+ .subscribe (event -> LOGGER .info ("Received from client: " + event .packet ));
97169
98- var clientNetwork = newStringDataClientNetwork ();
170+ var clientNetwork = newDefaultClientNetwork (ReadablePacketRegistry .of (DefaultReadablePacket .class ,
171+ ClientPackets .ResponseEchoMessage .class ,
172+ ClientPackets .ResponseServerTime .class
173+ ));
99174 clientNetwork .connected (serverAddress )
100175 .doOnNext (connection -> IntStream .range (10 , 100 )
101- .forEach (length -> connection .send (new StringWritablePacket (StringUtils .generate (length )))))
176+ .forEach (length -> {
177+ if (length % 2 == 0 ) {
178+ connection .send (new ClientPackets .RequestServerTime ());
179+ } else {
180+ connection .send (new ClientPackets .RequestEchoMessage (StringUtils .generate (length )));
181+ }
182+ }))
102183 .flatMapMany (Connection ::receivedEvents )
103184 .subscribe (event -> {
104- LOGGER .info ("Received from server: " + event .packet . getData () );
185+ LOGGER .info ("Received from server: " + event .packet );
105186 counter .countDown ();
106187 });
107188
0 commit comments