Skip to content

Commit 5392b16

Browse files
committed
Use monotonic time for reactor and H2 timeout accounting
1 parent 598ab65 commit 5392b16

21 files changed

Lines changed: 124 additions & 111 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.List;
3838
import java.util.Queue;
3939
import java.util.concurrent.ConcurrentLinkedDeque;
40+
import java.util.concurrent.TimeUnit;
4041
import java.util.concurrent.ConcurrentLinkedQueue;
4142
import java.util.concurrent.atomic.AtomicInteger;
4243
import java.util.function.Consumer;
@@ -146,12 +147,12 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
146147
private volatile boolean peerNoRfc7540Priorities;
147148

148149

149-
private static final long STREAM_TIMEOUT_GRANULARITY_MILLIS = 1000;
150-
private long lastStreamTimeoutCheckMillis;
150+
private static final long STREAM_TIMEOUT_GRANULARITY_NANOS = TimeUnit.SECONDS.toNanos(1);
151+
private long lastStreamTimeoutCheckNanos;
151152

152-
private static final long VALIDATE_AFTER_INACTIVITY_GRANULARITY_MILLIS = 1000;
153+
private static final long VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS = TimeUnit.SECONDS.toNanos(1);
153154
private final Timeout validateAfterInactivity;
154-
private volatile long lastActivityTime;
155+
private volatile long lastActivityNanos;
155156

156157
AbstractH2StreamMultiplexer(
157158
final ProtocolIOSession ioSession,
@@ -199,7 +200,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
199200

200201
this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
201202
this.streamListener = streamListener;
202-
this.lastActivityTime = System.currentTimeMillis();
203+
this.lastActivityNanos = System.nanoTime();
203204
this.validateAfterInactivity = validateAfterInactivity;
204205
}
205206

@@ -539,8 +540,8 @@ public final void onOutput() throws HttpException, IOException {
539540

540541
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
541542
final long t = TimeValue.isPositive(validateAfterInactivity) ?
542-
Math.max(validateAfterInactivity.toMilliseconds(), VALIDATE_AFTER_INACTIVITY_GRANULARITY_MILLIS) : 0;
543-
final boolean hasBeenIdleTooLong = t > 0 && System.currentTimeMillis() - lastActivityTime > t;
543+
Math.max(validateAfterInactivity.toNanoseconds(), VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS) : 0;
544+
final boolean hasBeenIdleTooLong = t > 0 && System.nanoTime() - lastActivityNanos > t;
544545
if (hasBeenIdleTooLong && ioSession.hasCommands() && pingHandlers.isEmpty()) {
545546
final Timeout socketTimeout = ioSession.getSocketTimeout();
546547
ioSession.setSocketTimeout(Timeout.ofSeconds(5));
@@ -1510,7 +1511,7 @@ class H2StreamChannelImpl implements H2StreamChannel {
15101511
private final AtomicInteger outputWindow;
15111512

15121513
private volatile boolean localClosed;
1513-
private volatile long localResetTime;
1514+
private volatile long localResetNanos = Long.MIN_VALUE;
15141515

15151516
H2StreamChannelImpl(final int id, final int initialInputWindowSize, final int initialOutputWindowSize) {
15161517
this.id = id;
@@ -1676,7 +1677,7 @@ public boolean localReset(final int code) throws IOException {
16761677
return false;
16771678
}
16781679
localClosed = true;
1679-
localResetTime = System.currentTimeMillis();
1680+
localResetNanos = System.nanoTime();
16801681

16811682
final RawFrame resetStream = frameFactory.createResetStream(id, code);
16821683
commitFrameInternal(resetStream);
@@ -1687,8 +1688,8 @@ public boolean localReset(final int code) throws IOException {
16871688
}
16881689

16891690
@Override
1690-
public long getLocalResetTime() {
1691-
return localResetTime;
1691+
public long getLocalResetNanos() {
1692+
return localResetNanos;
16921693
}
16931694

16941695
@Override
@@ -1743,14 +1744,14 @@ private void checkStreamTimeouts(final long nowNanos) throws IOException {
17431744
}
17441745

17451746
private void validateStreamTimeouts() throws IOException {
1746-
final long nowMillis = System.currentTimeMillis();
1747-
if ((nowMillis - lastStreamTimeoutCheckMillis) >= STREAM_TIMEOUT_GRANULARITY_MILLIS) {
1748-
lastStreamTimeoutCheckMillis = nowMillis;
1749-
checkStreamTimeouts(System.nanoTime());
1747+
final long nowNanos = System.nanoTime();
1748+
if ((nowNanos - lastStreamTimeoutCheckNanos) >= STREAM_TIMEOUT_GRANULARITY_NANOS) {
1749+
lastStreamTimeoutCheckNanos = nowNanos;
1750+
checkStreamTimeouts(nowNanos);
17501751
}
17511752
}
17521753

17531754
private void updateLastActivity() {
1754-
this.lastActivityTime = System.currentTimeMillis();
1755+
this.lastActivityNanos = System.nanoTime();
17551756
}
17561757
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.channels.CancelledKeyException;
3333
import java.nio.charset.CharacterCodingException;
3434
import java.util.List;
35+
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.atomic.AtomicBoolean;
3637
import java.util.concurrent.atomic.AtomicInteger;
3738
import java.util.concurrent.atomic.AtomicReference;
@@ -50,7 +51,7 @@
5051

5152
class H2Stream implements StreamControl {
5253

53-
private static final long LINGER_TIME = 1000; // 1 second
54+
private static final long LINGER_TIME_NANOS = TimeUnit.SECONDS.toNanos(1);
5455

5556
private final H2StreamChannel channel;
5657
private final H2StreamHandler handler;
@@ -124,8 +125,8 @@ AtomicInteger getInputWindow() {
124125
}
125126

126127
private boolean isPastLingerDeadline() {
127-
final long localResetTime = channel.getLocalResetTime();
128-
return localResetTime > 0 && localResetTime + LINGER_TIME < System.currentTimeMillis();
128+
final long localResetNanos = channel.getLocalResetNanos();
129+
return localResetNanos != Long.MIN_VALUE && System.nanoTime() - localResetNanos > LINGER_TIME_NANOS;
129130
}
130131

131132
boolean isClosedPastLingerDeadline() {

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2StreamChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ default void terminate() {
6868
}
6969
}
7070

71-
long getLocalResetTime();
71+
long getLocalResetNanos();
7272

7373
default boolean isLocalReset() {
74-
return getLocalResetTime() > 0;
74+
return getLocalResetNanos() != Long.MIN_VALUE;
7575
}
7676

7777
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ protected void validateSession(
143143
final TimeValue timeValue = validateAfterInactivity;
144144
if (TimeValue.isNonNegative(timeValue)) {
145145
final long lastAccessTime = Math.min(ioSession.getLastReadTime(), ioSession.getLastWriteTime());
146-
final long deadline = lastAccessTime + timeValue.toMilliseconds();
147-
if (deadline <= System.currentTimeMillis()) {
146+
final long deadline = lastAccessTime + timeValue.toNanoseconds();
147+
if (deadline <= System.nanoTime()) {
148148
ioSession.enqueue(new StaleCheckCommand(callback::execute), Command.Priority.NORMAL);
149149
return;
150150
}

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1706,23 +1706,23 @@ void testKeepAliveDisabledNeverEmitsPing() throws Exception {
17061706
Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Disabled policy must never emit PING");
17071707
}
17081708

1709-
private static long getValidateAfterInactivityGranularityMillis() throws Exception {
1710-
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("VALIDATE_AFTER_INACTIVITY_GRANULARITY_MILLIS");
1709+
private static long getValidateAfterInactivityGranularityNanos() throws Exception {
1710+
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS");
17111711
field.setAccessible(true);
17121712
return field.getLong(null);
17131713
}
17141714

1715-
private static void setLastActivityTime(final AbstractH2StreamMultiplexer mux, final long millis) throws Exception {
1716-
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("lastActivityTime");
1715+
private static void setLastActivityNanos(final AbstractH2StreamMultiplexer mux, final long nanos) throws Exception {
1716+
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("lastActivityNanos");
17171717
field.setAccessible(true);
1718-
field.setLong(mux, millis);
1718+
field.setLong(mux, nanos);
17191719
}
17201720

17211721
private static void makeMuxIdle(final AbstractH2StreamMultiplexer mux, final Timeout validateAfterInactivity) throws Exception {
1722-
final long granularityMillis = getValidateAfterInactivityGranularityMillis();
1723-
final long configuredMillis = validateAfterInactivity != null ? validateAfterInactivity.toMilliseconds() : 0;
1724-
final long effectiveMillis = configuredMillis > 0 ? Math.max(configuredMillis, granularityMillis) : 0;
1725-
setLastActivityTime(mux, System.currentTimeMillis() - effectiveMillis - 10);
1722+
final long granularityNanos = getValidateAfterInactivityGranularityNanos();
1723+
final long configuredNanos = validateAfterInactivity != null ? validateAfterInactivity.toNanoseconds() : 0;
1724+
final long effectiveNanos = configuredNanos > 0 ? Math.max(configuredNanos, granularityNanos) : 0;
1725+
setLastActivityNanos(mux, System.nanoTime() - effectiveNanos - TimeUnit.MILLISECONDS.toNanos(10));
17261726
}
17271727

17281728
@Test

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/pool/TestH2ConnPool.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.net.InetSocketAddress;
3030
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicReference;
3233

3334
import org.apache.hc.core5.concurrent.FutureCallback;
@@ -97,8 +98,9 @@ void testValidateSessionEnqueuesStaleCheck() {
9798

9899
final IOSession session = Mockito.mock(IOSession.class);
99100
Mockito.when(session.isOpen()).thenReturn(true);
100-
Mockito.when(session.getLastReadTime()).thenReturn(0L);
101-
Mockito.when(session.getLastWriteTime()).thenReturn(0L);
101+
final long farPastNanos = System.nanoTime() - TimeUnit.DAYS.toNanos(1);
102+
Mockito.when(session.getLastReadTime()).thenReturn(farPastNanos);
103+
Mockito.when(session.getLastWriteTime()).thenReturn(farPastNanos);
102104

103105
@SuppressWarnings("unchecked")
104106
final Callback<Boolean> callback = (Callback<Boolean>) Mockito.mock(Callback.class);

httpcore5-testing/src/main/java/org/apache/hc/core5/benchmark/HttpBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ private Results doExecute(final HttpAsyncRequester requester, final Stats stats)
473473

474474
final long deadline = config.getTimeLimit() != null ? config.getTimeLimit().toMilliseconds() : Long.MAX_VALUE;
475475

476-
final long startTime = System.currentTimeMillis();
476+
final long startNanos = System.nanoTime();
477477

478478
for (int i = 0; i < workers.length; i++) {
479479
workers[i].execute();
@@ -485,7 +485,7 @@ private Results doExecute(final HttpAsyncRequester requester, final Stats stats)
485485
System.out.println("...done");
486486
}
487487

488-
final long endTime = System.currentTimeMillis();
488+
final long endNanos = System.nanoTime();
489489

490490
for (int i = 0; i < workers.length; i++) {
491491
workers[i].releaseResources();
@@ -499,7 +499,7 @@ private Results doExecute(final HttpAsyncRequester requester, final Stats stats)
499499
requestUri.toASCIIString(),
500500
stats.getContentLength(),
501501
config.getConcurrencyLevel(),
502-
endTime - startTime,
502+
TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos),
503503
stats.getSuccessCount(),
504504
stats.getFailureCount(),
505505
stats.getKeepAliveCount(),

httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.net.SocketAddress;
3838
import java.util.ArrayList;
3939
import java.util.List;
40+
import java.util.concurrent.TimeUnit;
4041
import java.util.concurrent.locks.ReentrantLock;
4142

4243
import org.apache.hc.core5.net.InetAddressUtils;
@@ -254,7 +255,7 @@ public void start() throws IOException {
254255
}
255256

256257
public void shutdown(final TimeValue timeout) throws InterruptedException {
257-
final long waitUntil = System.currentTimeMillis() + timeout.toMilliseconds();
258+
final long deadlineNanos = System.nanoTime() + timeout.toNanoseconds();
258259
Thread t = null;
259260
lock.lock();
260261
try {
@@ -272,18 +273,18 @@ public void shutdown(final TimeValue timeout) throws InterruptedException {
272273
handler.shutdown();
273274
}
274275
while (!this.handlers.isEmpty()) {
275-
final long waitTime = waitUntil - System.currentTimeMillis();
276-
if (waitTime > 0) {
277-
wait(waitTime);
276+
final long remainingNanos = deadlineNanos - System.nanoTime();
277+
if (remainingNanos > 0) {
278+
wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
278279
}
279280
}
280281
} finally {
281282
lock.unlock();
282283
}
283284
if (t != null) {
284-
final long waitTime = waitUntil - System.currentTimeMillis();
285-
if (waitTime > 0) {
286-
t.join(waitTime);
285+
final long remainingNanos = deadlineNanos - System.nanoTime();
286+
if (remainingNanos > 0) {
287+
t.join(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
287288
}
288289
}
289290
}

httpcore5/src/main/java/org/apache/hc/core5/concurrent/BasicFuture.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,23 +110,26 @@ public T get(final long timeout, final TimeUnit unit)
110110
throws InterruptedException, ExecutionException, TimeoutException {
111111
Args.notNull(unit, "Time unit");
112112
final long msecs = unit.toMillis(timeout);
113-
final long startTime = (msecs <= 0) ? 0 : System.currentTimeMillis();
114-
long waitTime = msecs;
113+
final long waitNanos = unit.toNanos(timeout);
114+
final long startNanos = (waitNanos <= 0) ? 0 : System.nanoTime();
115+
long remainingNanos = waitNanos;
115116
try {
116117
lock.lock();
117118
if (this.completed) {
118119
return getResult();
119-
} else if (waitTime <= 0) {
120-
throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime));
120+
} else if (remainingNanos <= 0) {
121+
throw TimeoutValueException.fromMilliseconds(msecs, msecs);
121122
} else {
122123
for (; ; ) {
123-
condition.await(waitTime, TimeUnit.MILLISECONDS);
124+
condition.await(remainingNanos, TimeUnit.NANOSECONDS);
124125
if (this.completed) {
125126
return getResult();
126127
}
127-
waitTime = msecs - (System.currentTimeMillis() - startTime);
128-
if (waitTime <= 0) {
129-
throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime));
128+
remainingNanos = waitNanos - (System.nanoTime() - startNanos);
129+
if (remainingNanos <= 0) {
130+
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(
131+
System.nanoTime() - startNanos);
132+
throw TimeoutValueException.fromMilliseconds(msecs, elapsedMillis);
130133
}
131134
}
132135
}

httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public final void enumAvailable(final Callback<IOSession> callback) {
267267
}
268268

269269
public final void closeIdle(final TimeValue idleTime) {
270-
final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270+
final long deadline = System.nanoTime() - (TimeValue.isPositive(idleTime) ? idleTime.toNanoseconds() : 0);
271271
for (final PoolEntry poolEntry: sessionPool.values()) {
272272
if (poolEntry.session != null) {
273273
lock.lock();

0 commit comments

Comments
 (0)