|
40 | 40 | import java.util.Timer; |
41 | 41 | import java.util.TimerTask; |
42 | 42 | import java.util.concurrent.TimeoutException; |
| 43 | +import java.util.concurrent.atomic.AtomicInteger; |
43 | 44 | import java.util.concurrent.atomic.AtomicLong; |
44 | 45 | import javax.annotation.Nonnull; |
45 | 46 | import javax.annotation.Nullable; |
@@ -188,12 +189,14 @@ public class Channel implements Closeable { |
188 | 189 | @GuardedBy("socketLock") |
189 | 190 | protected InputHandler inputHandler; |
190 | 191 |
|
191 | | - /** |
192 | | - * Counter for producing request numbers |
193 | | - */ |
| 192 | + /** Counter for producing request numbers */ |
194 | 193 | @Nonnull |
195 | 194 | protected final AtomicLong requestCounter = new AtomicLong(new Random().nextInt(65536) + 1L); |
196 | 195 |
|
| 196 | + /** Counter for pending {@link Pong}s */ |
| 197 | + @Nonnull |
| 198 | + protected final AtomicInteger pingCounter = new AtomicInteger(); |
| 199 | + |
197 | 200 | /** |
198 | 201 | * Processors of requests by their identifiers |
199 | 202 | */ |
@@ -441,6 +444,7 @@ public boolean connect() throws IOException, NoSuchAlgorithmException, KeyManage |
441 | 444 | pingTimer.cancel(); |
442 | 445 | } |
443 | 446 | // Start regular pinging |
| 447 | + pingCounter.set(0); |
444 | 448 | PingTask pingTask = new PingTask(); |
445 | 449 | pingTimer = new Timer(remoteName + " PING timer"); |
446 | 450 | pingTimer.schedule(pingTask, 1000, PING_PERIOD); |
@@ -2410,19 +2414,48 @@ public PingTask() { |
2410 | 2414 |
|
2411 | 2415 | @Override |
2412 | 2416 | public void run() { |
| 2417 | + int count = pingCounter.get(); |
| 2418 | + if (count > 0) { |
| 2419 | + LOGGER.warn( |
| 2420 | + CAST_API_MARKER, "No PONG received for the previous PING from {}, disconnecting", remoteName |
| 2421 | + ); |
| 2422 | + try { |
| 2423 | + close(); |
| 2424 | + } catch (IOException e) { |
| 2425 | + LOGGER.debug( |
| 2426 | + CAST_API_MARKER, |
| 2427 | + "An error occurred while closing {} socket: {}", |
| 2428 | + remoteName, |
| 2429 | + e.getMessage() |
| 2430 | + ); |
| 2431 | + } |
| 2432 | + cancel(); |
| 2433 | + } |
2413 | 2434 | if (LOGGER.isTraceEnabled(CAST_API_HEARTBEAT_MARKER)) { |
2414 | 2435 | LOGGER.trace(CAST_API_HEARTBEAT_MARKER, "Pinging {}", remoteName); |
2415 | 2436 | } |
2416 | 2437 | try { |
2417 | 2438 | write(message); |
| 2439 | + pingCounter.incrementAndGet(); |
2418 | 2440 | } catch (IOException e) { |
2419 | 2441 | LOGGER.warn( |
2420 | 2442 | CAST_API_MARKER, |
2421 | | - "An error occurred while sending 'PING' to {}: {}", |
| 2443 | + "Disconnecting, because an error occurred while sending 'PING' to {}: {}", |
2422 | 2444 | remoteName, |
2423 | 2445 | e.getMessage() |
2424 | 2446 | ); |
2425 | 2447 | LOGGER.trace(CAST_API_MARKER, "", e); |
| 2448 | + try { |
| 2449 | + close(); |
| 2450 | + } catch (IOException ioe) { |
| 2451 | + LOGGER.debug( |
| 2452 | + CAST_API_MARKER, |
| 2453 | + "An error occurred while closing {} socket: {}", |
| 2454 | + remoteName, |
| 2455 | + ioe.getMessage() |
| 2456 | + ); |
| 2457 | + } |
| 2458 | + cancel(); |
2426 | 2459 | } |
2427 | 2460 | } |
2428 | 2461 | } |
@@ -2612,6 +2645,21 @@ public void run() { |
2612 | 2645 | write(pongMessage); |
2613 | 2646 | } else if ("PONG".equals(responseType)) { |
2614 | 2647 | LOGGER.trace(CAST_API_HEARTBEAT_MARKER, "Received PONG from {}", remoteName); |
| 2648 | + while (true) { |
| 2649 | + int current = pingCounter.get(); |
| 2650 | + int next = current - 1; |
| 2651 | + if (next < 0) { |
| 2652 | + LOGGER.warn( |
| 2653 | + CAST_API_MARKER, |
| 2654 | + "PING PONG mismatch, received more PONGs than sent PINGs from {}", |
| 2655 | + remoteName |
| 2656 | + ); |
| 2657 | + break; |
| 2658 | + } |
| 2659 | + if (pingCounter.compareAndSet(current, next)) { |
| 2660 | + break; |
| 2661 | + } |
| 2662 | + } |
2615 | 2663 | } else { |
2616 | 2664 | LOGGER.trace( |
2617 | 2665 | CAST_API_HEARTBEAT_MARKER, |
|
0 commit comments