Skip to content

Commit b33ebce

Browse files
authored
add VerifyDelayedCallbackTimerRaceOnManualQueue with a test hook to force an interleaving where the dispatch thread and threadpool concurrently Cancel and QueueItem - permanently stalling delayed callbacks (#947)
1 parent dae3440 commit b33ebce

3 files changed

Lines changed: 204 additions & 2 deletions

File tree

Source/Task/TaskQueue.cpp

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
#include "TaskQueueImpl.h"
88
#include "XTaskQueuePriv.h"
99

10+
#ifdef HC_UNITTEST_API
11+
TestBarrier* g_testBarrier = nullptr;
12+
#endif
13+
1014
//
11-
// ApiRefs tracks global refcounts for all APIs. It is used to idenfity memory leaks
15+
// ApiRefs tracks global refcounts for all APIs. It is used to identify memory leaks
1216
// in tests and can be called to wait for all refs to be released.
1317
//
1418
namespace ApiRefs
@@ -1107,7 +1111,32 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
11071111
uint64_t noDueTime = UINT64_MAX;
11081112
if (m_timerDue.compare_exchange_strong(dueTime, noDueTime))
11091113
{
1110-
m_timer.Cancel();
1114+
#ifdef HC_UNITTEST_API
1115+
// Test hook: two-phase barrier reproduces the timer race
1116+
// that results in lost delayed task wakes.
1117+
if (g_testBarrier != nullptr)
1118+
{
1119+
{
1120+
std::lock_guard<std::mutex> lk(g_testBarrier->mtx);
1121+
g_testBarrier->phase1_ready = true;
1122+
}
1123+
g_testBarrier->cv.notify_all();
1124+
1125+
std::unique_lock<std::mutex> lk(g_testBarrier->mtx);
1126+
g_testBarrier->cv.wait_for(lk, std::chrono::seconds(5),
1127+
[&] { return g_testBarrier->phase2_ready; });
1128+
}
1129+
#endif
1130+
// Bug fix: ScheduleNextPendingCallback timer race results
1131+
// in lost delayed task wakes.
1132+
//
1133+
// The CAS above is sufficient: the timer has already fired
1134+
// (call site 1: SubmitPendingCallback) or was already
1135+
// canceled (call site 2: CancelPendingEntries). A Cancel()
1136+
// here raced with concurrent QueueItem/Start calls on other
1137+
// threads, permanently stranding entries in m_pendingList.
1138+
// See VerifyDelayedCallbackTimerRaceOnManualQueue for full
1139+
// analysis.
11111140
}
11121141
}
11131142

Source/Task/XTaskQueuePriv.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55

66
#include "XTaskQueue.h"
77

8+
#ifdef HC_UNITTEST_API
9+
#include <mutex>
10+
#include <condition_variable>
11+
#endif
12+
813
//----------------------------------------------------------------//
914
//
1015
// These APIs should be reserved for driving unit test harnesses.
@@ -120,3 +125,20 @@ STDAPI_(bool) XTaskQueueGetCurrentProcessTaskQueueWithOptions(
120125
STDAPI_(bool) XTaskQueueUninitialize(
121126
_In_ uint32_t timeoutMilliseconds = 0
122127
) noexcept;
128+
129+
#ifdef HC_UNITTEST_API
130+
131+
// Two-phase barrier for deterministic reproduction of the
132+
// ScheduleNextPendingCallback timer race that results in lost delayed
133+
// task wakes. See VerifyDelayedCallbackTimerRaceOnManualQueue.
134+
struct TestBarrier
135+
{
136+
std::mutex mtx;
137+
std::condition_variable cv;
138+
bool phase1_ready = false; // threadpool -> test: "CAS done"
139+
bool phase2_ready = false; // test -> threadpool: "Start done"
140+
};
141+
142+
extern TestBarrier* g_testBarrier;
143+
144+
#endif

Tests/UnitTests/Tests/TaskQueueTests.cpp

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,4 +1851,155 @@ DEFINE_TEST_CLASS(TaskQueueTests)
18511851
LOG_COMMENT(L"Test completed: workCallbacks=%d createErrors=%d terminateErrors=%d",
18521852
workCallbackCount.load(), createErrors.load(), terminateErrors.load());
18531853
}
1854+
#ifdef HC_UNITTEST_API
1855+
DEFINE_TEST_CASE(VerifyDelayedCallbackTimerRaceOnManualQueue) {
1856+
// Regression: ScheduleNextPendingCallback timer race results in
1857+
// lost delayed task wakes.
1858+
//
1859+
// We use a barrier to isolate the race. PTP_TIMER callbacks are
1860+
// not serialized, so a concurrent QueueItem that calls Start()
1861+
// will have its timer fire on another threadpool thread DURING
1862+
// the Sleep, self-healing the state before Cancel() runs. In
1863+
// production the gap is nanoseconds -- the timer can't fire in
1864+
// time.
1865+
//
1866+
// Instead we use a two-phase cv barrier (TestBarrier, compiled under
1867+
// HC_UNITTEST_API) that forces the exact interleaving that causes the
1868+
// the permanent stall:
1869+
//
1870+
// Threadpool thread Test dispatch thread
1871+
// ----------------- --------------------
1872+
// CAS m_timerDue -> UINT64_MAX
1873+
// signal(phase1) -----------> (unblocked)
1874+
// wait(phase2) QueueItem -> push + CAS + Start(T)
1875+
// <----------- signal(phase2)
1876+
// [Cancel() would kill T's timer here -- removed by the fix]
1877+
// ... timer never fires ...
1878+
// ... m_timerDue stuck at T ...
1879+
// ... PERMANENT STALL ...
1880+
//
1881+
// The dispatch thread's QueueItem calls Start(T) with a delay of
1882+
// N ms. Cancel() runs immediately after phase2 -- well within
1883+
// the N ms window -- so the timer hasn't fired yet. This exactly
1884+
// reproduces the production race.
1885+
1886+
TestBarrier barrier;
1887+
barrier.phase1_ready = false;
1888+
barrier.phase2_ready = false;
1889+
g_testBarrier = &barrier;
1890+
1891+
AutoQueueHandle queue;
1892+
VERIFY_SUCCEEDED(XTaskQueueCreate(XTaskQueueDispatchMode::Manual,
1893+
XTaskQueueDispatchMode::Immediate,
1894+
&queue));
1895+
1896+
// Self-cycling delayed callback. As the sole pending entry,
1897+
// every timer fire hits the "no next item" branch.
1898+
std::atomic<uint32_t> cycleCount{0};
1899+
std::atomic<bool> running{true};
1900+
1901+
struct CycleContext {
1902+
XTaskQueueHandle queue;
1903+
std::atomic<uint32_t> *counter;
1904+
std::atomic<bool> *running;
1905+
1906+
static void CALLBACK Invoke(void *ctx, bool cancel) {
1907+
if (cancel)
1908+
return;
1909+
auto *self = static_cast<CycleContext *>(ctx);
1910+
self->counter->fetch_add(1);
1911+
if (self->running->load()) {
1912+
// 500 ms: long enough that Cancel() reliably beats it.
1913+
XTaskQueueSubmitDelayedCallback(self->queue, XTaskQueuePort::Work,
1914+
500, ctx,
1915+
&CycleContext::Invoke);
1916+
}
1917+
}
1918+
};
1919+
1920+
CycleContext ctx{queue.Handle(), &cycleCount, &running};
1921+
VERIFY_SUCCEEDED(XTaskQueueSubmitDelayedCallback(
1922+
queue, XTaskQueuePort::Work, 1, &ctx,
1923+
&CycleContext::Invoke));
1924+
1925+
// Wait for the threadpool to CAS m_timerDue -> UINT64_MAX
1926+
// and hit the barrier (phase1).
1927+
{
1928+
std::unique_lock<std::mutex> lk(barrier.mtx);
1929+
bool ok = barrier.cv.wait_for(lk, std::chrono::seconds(5),
1930+
[&] { return barrier.phase1_ready; });
1931+
VERIFY_IS_TRUE(ok);
1932+
}
1933+
1934+
// The threadpool is blocked inside ScheduleNextPendingCallback,
1935+
// so the cycling callback's entry hasn't reached the ready queue
1936+
// yet (AppendEntry hasn't run). We can't dispatch it directly.
1937+
//
1938+
// Instead, submit a delay=0 "trigger" that bypasses the pending
1939+
// list, lands directly in the ready queue, and when dispatched
1940+
// calls QueueItem (racing with Cancel) then signals phase2.
1941+
struct TriggerContext {
1942+
XTaskQueueHandle queue;
1943+
TestBarrier* barrier;
1944+
};
1945+
TriggerContext trigCtx{queue.Handle(), &barrier};
1946+
1947+
auto triggerCallback = [](void *ctx, bool cancel) {
1948+
if (cancel)
1949+
return;
1950+
auto *tc = static_cast<TriggerContext *>(ctx);
1951+
// delay=1 ms: T_new expires quickly, so m_timerDue becomes a
1952+
// stale past value -- the condition for a permanent stall.
1953+
static auto dummyCb = [](void*, bool) {};
1954+
XTaskQueueSubmitDelayedCallback(
1955+
tc->queue, XTaskQueuePort::Work, 1, nullptr, dummyCb);
1956+
{
1957+
std::lock_guard<std::mutex> lk(tc->barrier->mtx);
1958+
tc->barrier->phase2_ready = true;
1959+
}
1960+
tc->barrier->cv.notify_all();
1961+
};
1962+
1963+
VERIFY_SUCCEEDED(XTaskQueueSubmitCallback(
1964+
queue, XTaskQueuePort::Work, &trigCtx, triggerCallback));
1965+
1966+
// Dispatch trigger on this thread: QueueItem -> Start(1ms),
1967+
// signal(phase2) -> threadpool wakes -> [Cancel() removed].
1968+
VERIFY_IS_TRUE(XTaskQueueDispatch(queue, XTaskQueuePort::Work, 1000));
1969+
1970+
// Race has fired. Disable barrier.
1971+
g_testBarrier = nullptr;
1972+
1973+
// Stop cycling callback and drain ready queue.
1974+
running.store(false);
1975+
while (XTaskQueueDispatch(queue, XTaskQueuePort::Work, 200)) {
1976+
}
1977+
1978+
// A delayed callback that should fire promptly (canary).
1979+
// With the bug, it's stranded in pendingList forever.
1980+
std::atomic<bool> canaryFired{false};
1981+
auto canaryCallback = [](void *ctx, bool cancel) {
1982+
if (!cancel)
1983+
static_cast<std::atomic<bool> *>(ctx)->store(true);
1984+
};
1985+
VERIFY_SUCCEEDED(XTaskQueueSubmitDelayedCallback(
1986+
queue, XTaskQueuePort::Work, 10, &canaryFired, canaryCallback));
1987+
1988+
const uint64_t canaryStart = GetTickCount64();
1989+
while (!canaryFired.load() &&
1990+
GetTickCount64() - canaryStart < 2000) {
1991+
XTaskQueueDispatch(queue, XTaskQueuePort::Work, 100);
1992+
}
1993+
1994+
LOG_COMMENT(L"Cycle count: %u", cycleCount.load());
1995+
LOG_COMMENT(L"Canary fired: %s",
1996+
canaryFired.load() ? L"yes" : L"NO -- timer stalled");
1997+
1998+
XTaskQueueTerminate(queue, false, nullptr, nullptr);
1999+
while (XTaskQueueDispatch(queue, XTaskQueuePort::Work, 0)) {
2000+
}
2001+
2002+
VERIFY_IS_TRUE(canaryFired.load());
2003+
}
2004+
#endif // HC_UNITTEST_API
18542005
};

0 commit comments

Comments
 (0)