|
40 | 40 | import java.util.Timer; |
41 | 41 | import java.util.TimerTask; |
42 | 42 | import java.util.concurrent.TimeoutException; |
| 43 | +import java.util.concurrent.atomic.AtomicInteger; |
43 | 44 | import java.util.concurrent.atomic.AtomicLong; |
44 | 45 | import javax.annotation.Nonnull; |
45 | 46 | import javax.annotation.Nullable; |
@@ -188,12 +189,14 @@ public class Channel implements Closeable { |
188 | 189 | @GuardedBy("socketLock") |
189 | 190 | protected InputHandler inputHandler; |
190 | 191 |
|
191 | | - /** |
192 | | - * Counter for producing request numbers |
193 | | - */ |
| 192 | + /** Counter for producing request numbers */ |
194 | 193 | @Nonnull |
195 | 194 | protected final AtomicLong requestCounter = new AtomicLong(new Random().nextInt(65536) + 1L); |
196 | 195 |
|
| 196 | + /** Counter for pending {@link Pong}s */ |
| 197 | + @Nonnull |
| 198 | + protected final AtomicInteger pingCounter = new AtomicInteger(); |
| 199 | + |
197 | 200 | /** |
198 | 201 | * Processors of requests by their identifiers |
199 | 202 | */ |
@@ -441,6 +444,7 @@ public boolean connect() throws IOException, NoSuchAlgorithmException, KeyManage |
441 | 444 | pingTimer.cancel(); |
442 | 445 | } |
443 | 446 | // Start regular pinging |
| 447 | + pingCounter.set(0); |
444 | 448 | PingTask pingTask = new PingTask(); |
445 | 449 | pingTimer = new Timer(remoteName + " PING timer"); |
446 | 450 | pingTimer.schedule(pingTask, 1000, PING_PERIOD); |
@@ -2362,19 +2366,48 @@ public PingTask() { |
2362 | 2366 |
|
2363 | 2367 | @Override |
2364 | 2368 | public void run() { |
| 2369 | + int count = pingCounter.get(); |
| 2370 | + if (count > 0) { |
| 2371 | + LOGGER.warn( |
| 2372 | + CAST_API_MARKER, "No PONG received for the previous PING from {}, disconnecting", remoteName |
| 2373 | + ); |
| 2374 | + try { |
| 2375 | + close(); |
| 2376 | + } catch (IOException e) { |
| 2377 | + LOGGER.debug( |
| 2378 | + CAST_API_MARKER, |
| 2379 | + "An error occurred while closing {} socket: {}", |
| 2380 | + remoteName, |
| 2381 | + e.getMessage() |
| 2382 | + ); |
| 2383 | + } |
| 2384 | + cancel(); |
| 2385 | + } |
2365 | 2386 | if (LOGGER.isTraceEnabled(CAST_API_HEARTBEAT_MARKER)) { |
2366 | 2387 | LOGGER.trace(CAST_API_HEARTBEAT_MARKER, "Pinging {}", remoteName); |
2367 | 2388 | } |
2368 | 2389 | try { |
2369 | 2390 | write(message); |
| 2391 | + pingCounter.incrementAndGet(); |
2370 | 2392 | } catch (IOException e) { |
2371 | 2393 | LOGGER.warn( |
2372 | 2394 | CAST_API_MARKER, |
2373 | | - "An error occurred while sending 'PING' to {}: {}", |
| 2395 | + "Disconnecting, because an error occurred while sending 'PING' to {}: {}", |
2374 | 2396 | remoteName, |
2375 | 2397 | e.getMessage() |
2376 | 2398 | ); |
2377 | 2399 | LOGGER.trace(CAST_API_MARKER, "", e); |
| 2400 | + try { |
| 2401 | + close(); |
| 2402 | + } catch (IOException ioe) { |
| 2403 | + LOGGER.debug( |
| 2404 | + CAST_API_MARKER, |
| 2405 | + "An error occurred while closing {} socket: {}", |
| 2406 | + remoteName, |
| 2407 | + ioe.getMessage() |
| 2408 | + ); |
| 2409 | + } |
| 2410 | + cancel(); |
2378 | 2411 | } |
2379 | 2412 | } |
2380 | 2413 | } |
@@ -2564,6 +2597,21 @@ public void run() { |
2564 | 2597 | write(pongMessage); |
2565 | 2598 | } else if ("PONG".equals(responseType)) { |
2566 | 2599 | LOGGER.trace(CAST_API_HEARTBEAT_MARKER, "Received PONG from {}", remoteName); |
| 2600 | + while (true) { |
| 2601 | + int current = pingCounter.get(); |
| 2602 | + int next = current - 1; |
| 2603 | + if (next < 0) { |
| 2604 | + LOGGER.warn( |
| 2605 | + CAST_API_MARKER, |
| 2606 | + "PING PONG mismatch, received more PONGs than sent PINGs from {}", |
| 2607 | + remoteName |
| 2608 | + ); |
| 2609 | + break; |
| 2610 | + } |
| 2611 | + if (pingCounter.compareAndSet(current, next)) { |
| 2612 | + break; |
| 2613 | + } |
| 2614 | + } |
2567 | 2615 | } else { |
2568 | 2616 | LOGGER.trace( |
2569 | 2617 | CAST_API_HEARTBEAT_MARKER, |
|
0 commit comments