@@ -34,16 +34,16 @@ public abstract class AbstractPacketReader<R extends ReadablePacket, C extends C
3434
3535 private static final Logger LOGGER = LoggerManager .getLogger (AbstractPacketReader .class );
3636
37- private final CompletionHandler <Integer , Void > readHandler = new CompletionHandler <>() {
37+ private final CompletionHandler <Integer , ByteBuffer > readHandler = new CompletionHandler <>() {
3838
3939 @ Override
40- public void completed (@ NotNull Integer result , @ Nullable Void attachment ) {
41- handleReadData ( result );
40+ public void completed (@ NotNull Integer receivedBytes , @ NotNull ByteBuffer readingBuffer ) {
41+ handleReceivedData ( receivedBytes , readingBuffer );
4242 }
4343
4444 @ Override
45- public void failed (@ NotNull Throwable exc , @ Nullable Void attachment ) {
46- handleFailedRead (exc );
45+ public void failed (@ NotNull Throwable exc , @ NotNull ByteBuffer readingBuffer ) {
46+ handleFailedReceiving (exc , readingBuffer );
4747 }
4848 };
4949
@@ -96,7 +96,9 @@ public void startRead() {
9696
9797 LOGGER .debug (channel , ch -> "Start waiting for new data from channel \" " + getRemoteAddress (ch ) + "\" " );
9898
99- channel .read (getBufferToReadFromChannel (), null , readHandler );
99+ var buffer = getBufferToReadFromChannel ();
100+
101+ channel .read (buffer , buffer , readHandler );
100102 }
101103
102104 /**
@@ -118,24 +120,24 @@ protected int readPackets(@NotNull ByteBuffer receivedBuffer) {
118120 */
119121 protected int readPackets (@ NotNull ByteBuffer receivedBuffer , @ NotNull ByteBuffer pendingBuffer ) {
120122
121- LOGGER .debug (receivedBuffer , buf -> "Start reading packets from received buffer: " + buf );
123+ LOGGER .debug (receivedBuffer , buf -> "Start reading packets from received buffer " + buf );
122124
123125 var waitedBytes = pendingBuffer .position ();
124126 var bufferToRead = receivedBuffer ;
125127 var tempPendingBuffer = getTempPendingBuffer ();
126128
127- // if we have read mapped buffer it means that we are reading a really big packet now
129+ // if we have a temp buffer it means that we are reading a really big packet now
128130 if (tempPendingBuffer != null ) {
129131
130132 if (tempPendingBuffer .remaining () < receivedBuffer .remaining ()) {
131133 reAllocTempBuffers (tempPendingBuffer .flip (), tempPendingBuffer .capacity ());
132134 tempPendingBuffer = notNull (getTempPendingBuffer ());
133135 }
134136
135- LOGGER .debugNullable (
137+ LOGGER .debug (
136138 receivedBuffer ,
137139 tempPendingBuffer ,
138- (buf , mappedBuf ) -> "Put received buffer: " + buf + " to read mapped buffer: " + mappedBuf
140+ (buf , mappedBuf ) -> "Put received buffer " + buf + " to read mapped buffer " + mappedBuf
139141 );
140142
141143 bufferToRead = BufferUtils .putToAndFlip (tempPendingBuffer , receivedBuffer );
@@ -149,8 +151,8 @@ else if (waitedBytes > 0) {
149151 LOGGER .debug (
150152 pendingBuffer ,
151153 receivedBuffer ,
152- (penBuf , buf ) -> "Pending buffer: " + penBuf + " is too small to append received buffer: " +
153- buf + ", allocate mapped buffer for this"
154+ (penBuf , buf ) -> "Pending buffer " + penBuf + " is too small to append received buffer " +
155+ buf + ", will allocate new temp buffer for this"
154156 );
155157
156158 allocTempBuffers (pendingBuffer .flip (), pendingBuffer .capacity ());
@@ -264,23 +266,29 @@ else if (packetLength > tempPendingBuffer.capacity()) {
264266 LOGGER .debug (
265267 channel ,
266268 readPackets ,
267- (ch , count ) -> "Read " + count + " packet(s) from buffered data of " + getRemoteAddress (ch ) + ", " +
269+ (ch , count ) -> "Read " + count + " packets from received buffer of " + getRemoteAddress (ch ) + ", " +
268270 "but 1 packet is still waiting for receiving additional data."
269271 );
270272
273+ receivedBuffer .clear ();
271274 return readPackets ;
272275 }
273276
274277 R packet = createPacketFor (bufferToRead , positionBeforeRead , packetLength , dataLength );
275278
276279 if (packet != null ) {
277280 LOGGER .debug (packet , pck -> "Created instance of packet to read data: " + pck );
278- packet .read (connection , bufferToRead , dataLength );
279- readPacketHandler .accept (packet );
280- LOGGER .debug (packet , pck -> "Read data of packet: " + pck );
281+
282+ if (packet .read (connection , bufferToRead , dataLength )) {
283+ readPacketHandler .accept (packet );
284+ } else {
285+ LOGGER .error ("Packet " + packet + " was read incorrectly" );
286+ }
287+
288+ LOGGER .debug (packet , pck -> "Finished reading data of packet: " + pck );
281289 readPackets ++;
282290 } else {
283- LOGGER .warning ("Cannot create any instance of packet to read data. " );
291+ LOGGER .warning ("Cannot create any instance of packet to read data" );
284292 }
285293
286294 bufferToRead .position (endPosition );
@@ -301,9 +309,13 @@ else if (packetLength > tempPendingBuffer.capacity()) {
301309 freeTempBuffers ();
302310 }
303311
304- LOGGER .debug (channel , readPackets ,
305- (ch , count ) -> "Read " + count + " packet(s) from buffered data of " + getRemoteAddress (ch ) + "." );
312+ LOGGER .debug (
313+ channel ,
314+ readPackets ,
315+ (ch , count ) -> "Read " + count + " packets from received buffer of " + getRemoteAddress (ch ) + "."
316+ );
306317
318+ receivedBuffer .clear ();
307319 return readPackets ;
308320 }
309321
@@ -393,32 +405,30 @@ protected void freeTempBuffers() {
393405 }
394406
395407 /**
396- * Handle read data.
408+ * Handle received data.
397409 *
398- * @param result the count of read bytes.
410+ * @param receivedBytes the count of received bytes.
411+ * @param readingBuffer the currently reading buffer.
399412 */
400- protected void handleReadData (@ NotNull Integer result ) {
413+ protected void handleReceivedData (@ NotNull Integer receivedBytes , @ NotNull ByteBuffer readingBuffer ) {
401414 updateActivityFunction .run ();
402415
403- if (result == -1 ) {
416+ if (receivedBytes == -1 ) {
404417 connection .close ();
405418 return ;
406419 }
407420
408421 LOGGER .debug (
409- result ,
422+ receivedBytes ,
410423 channel ,
411424 (bytes , ch ) -> "Received " + bytes + " bytes from channel \" " + NetworkUtils .getRemoteAddress (ch ) + "\" "
412425 );
413426
414- var readBuffer = getBufferToReadFromChannel ();
415- readBuffer .flip ();
427+ readingBuffer .flip ();
416428 try {
417- readPackets (readBuffer );
429+ readPackets (readingBuffer );
418430 } catch (Exception e ) {
419431 LOGGER .error (e );
420- } finally {
421- readBuffer .clear ();
422432 }
423433
424434 if (isReading .compareAndSet (true , false )) {
@@ -427,13 +437,14 @@ protected void handleReadData(@NotNull Integer result) {
427437 }
428438
429439 /**
430- * Handle the exception during reading data.
440+ * Handle the exception during receiving data.
431441 *
432- * @param exception the exception.
442+ * @param exception the exception.
443+ * @param readingBuffer the currently reading buffer.
433444 */
434- protected void handleFailedRead (@ NotNull Throwable exception ) {
445+ protected void handleFailedReceiving (@ NotNull Throwable exception , @ NotNull ByteBuffer readingBuffer ) {
435446 if (exception instanceof AsynchronousCloseException ) {
436- LOGGER .warning (connection , cn -> "Connection " + cn .getRemoteAddress () + " was closed." );
447+ LOGGER .info (connection , cn -> "Connection " + cn .getRemoteAddress () + " was closed." );
437448 } else {
438449 LOGGER .error (exception );
439450 connection .close ();
0 commit comments