Skip to content

Commit 8a5f76f

Browse files
committed
Remove beforeThreadsWakeUp callback and condition watcher mechanism
With RepeatableWorkflowThread, condition evaluation now happens inside workflow threads themselves, making the beforeThreadsWakeUp callback and condition watcher mechanism obsolete. - Remove beforeThreadsWakeUp parameter from DeterministicRunner - Remove conditionWatchers, registerConditionWatcher, evaluateConditionWatchers from SyncWorkflowContext - Remove ConditionWatcher class - Remove getBeforeThreadsWakeUpCallback method - Remove related tests
1 parent 4f1bf99 commit 8a5f76f

5 files changed

Lines changed: 5 additions & 217 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ static DeterministicRunner newRunner(
3030
SyncWorkflowContext workflowContext,
3131
Runnable root,
3232
WorkflowExecutorCache cache) {
33-
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache, null);
33+
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, cache);
3434
}
3535

3636
/**
@@ -45,29 +45,7 @@ static DeterministicRunner newRunner(
4545
WorkflowThreadExecutor workflowThreadExecutor,
4646
SyncWorkflowContext workflowContext,
4747
Runnable root) {
48-
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null, null);
49-
}
50-
51-
/**
52-
* Create new instance of DeterministicRunner with a callback invoked before threads wake up.
53-
*
54-
* @param workflowThreadExecutor executor for workflow thread Runnables
55-
* @param workflowContext workflow context to use
56-
* @param root function that root thread of the runner executes.
57-
* @param cache WorkflowExecutorCache used cache inflight workflows
58-
* @param beforeThreadsWakeUp callback invoked once per loop iteration before threads run. Returns
59-
* true if progress was made (e.g., a condition watcher fired), which causes the loop to
60-
* continue even if all threads are blocked. Returns false if no progress was made.
61-
* @return instance of the DeterministicRunner.
62-
*/
63-
static DeterministicRunner newRunner(
64-
WorkflowThreadExecutor workflowThreadExecutor,
65-
SyncWorkflowContext workflowContext,
66-
Runnable root,
67-
WorkflowExecutorCache cache,
68-
@Nullable Supplier<Boolean> beforeThreadsWakeUp) {
69-
return new DeterministicRunnerImpl(
70-
workflowThreadExecutor, workflowContext, root, cache, beforeThreadsWakeUp);
48+
return new DeterministicRunnerImpl(workflowThreadExecutor, workflowContext, root, null);
7149
}
7250

7351
/**

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ class DeterministicRunnerImpl implements DeterministicRunner {
6969
// always accessed under the runner lock
7070
private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
7171

72-
// Callback invoked before threads wake up in each event loop iteration
73-
@Nullable private final Supplier<Boolean> beforeThreadsWakeUp;
74-
7572
// Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be
7673
// synchronized.
7774
// Inside DeterministicRunner the access to these variables is under the runner lock.
@@ -147,30 +144,20 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
147144
WorkflowThreadExecutor workflowThreadExecutor,
148145
@Nonnull SyncWorkflowContext workflowContext,
149146
Runnable root) {
150-
this(workflowThreadExecutor, workflowContext, root, null, null);
147+
this(workflowThreadExecutor, workflowContext, root, null);
151148
}
152149

153150
DeterministicRunnerImpl(
154151
WorkflowThreadExecutor workflowThreadExecutor,
155152
@Nonnull SyncWorkflowContext workflowContext,
156153
Runnable root,
157154
WorkflowExecutorCache cache) {
158-
this(workflowThreadExecutor, workflowContext, root, cache, null);
159-
}
160-
161-
DeterministicRunnerImpl(
162-
WorkflowThreadExecutor workflowThreadExecutor,
163-
@Nonnull SyncWorkflowContext workflowContext,
164-
Runnable root,
165-
WorkflowExecutorCache cache,
166-
@Nullable Supplier<Boolean> beforeThreadsWakeUp) {
167155
this.workflowThreadExecutor = workflowThreadExecutor;
168156
this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext");
169157
// TODO this should be refactored, publishing of this in an constructor into external objects is
170158
// a bad practice
171159
this.workflowContext.setRunner(this);
172160
this.cache = cache;
173-
this.beforeThreadsWakeUp = beforeThreadsWakeUp;
174161
boolean deterministicCancellationScopeOrder =
175162
workflowContext
176163
.getReplayContext()
@@ -222,14 +209,7 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
222209
}
223210
toExecuteInWorkflowThread.clear();
224211

225-
// Invoke beforeThreadsWakeUp callback BEFORE running threads.
226-
// This allows the callback to evaluate conditions and complete promises,
227-
// ensuring threads see updated state when they wake up.
228-
if (beforeThreadsWakeUp != null) {
229-
progress = beforeThreadsWakeUp.get();
230-
} else {
231-
progress = false;
232-
}
212+
progress = false;
233213

234214
Iterator<WorkflowThread> ci = threads.iterator();
235215
while (ci.hasNext()) {

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
124124
context.getWorkflowExecution()))
125125
.start();
126126
},
127-
cache,
128-
workflowContext.getBeforeThreadsWakeUpCallback());
127+
cache);
129128
}
130129

131130
@Override

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import java.time.Duration;
5555
import java.time.Instant;
5656
import java.util.*;
57-
import java.util.Iterator;
5857
import java.util.concurrent.atomic.AtomicBoolean;
5958
import java.util.concurrent.atomic.AtomicReference;
6059
import java.util.function.BiPredicate;
@@ -105,9 +104,6 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
105104
private final WorkflowThreadLocal<UpdateInfo> currentUpdateInfo = new WorkflowThreadLocal<>();
106105
@Nullable private String currentDetails;
107106

108-
// Condition watchers for async await functionality
109-
private final List<ConditionWatcher> conditionWatchers = new ArrayList<>();
110-
111107
public SyncWorkflowContext(
112108
@Nonnull String namespace,
113109
@Nonnull WorkflowExecution workflowExecution,
@@ -1675,93 +1671,4 @@ public Failure getFailure() {
16751671
return failure;
16761672
}
16771673
}
1678-
1679-
/**
1680-
* Returns a callback to be used by DeterministicRunner before threads wake up. This callback
1681-
* evaluates condition watchers and completes promises as needed.
1682-
*/
1683-
public Supplier<Boolean> getBeforeThreadsWakeUpCallback() {
1684-
return this::evaluateConditionWatchers;
1685-
}
1686-
1687-
/**
1688-
* Registers a condition watcher for async await functionality. The condition is evaluated
1689-
* immediately (inline) before registering. If already satisfied, returns immediately without
1690-
* registering a watcher. Otherwise, the condition is re-evaluated at the end of each event loop
1691-
* iteration until it returns true.
1692-
*
1693-
* <p>IMPORTANT: The condition must never throw exceptions. If it does, the workflow task will
1694-
* fail. Callers should handle exceptions within the condition supplier and complete their promise
1695-
* appropriately before returning true.
1696-
*
1697-
* @param condition Supplier that returns true when the wait is complete (caller handles promise
1698-
* completion in the supplier body). Evaluated in read-only mode. Must not throw exceptions.
1699-
* @return A Runnable that cancels the watcher when invoked (no-op if condition was already
1700-
* satisfied).
1701-
*/
1702-
Runnable registerConditionWatcher(Supplier<Boolean> condition) {
1703-
// Evaluate condition inline - if already satisfied, no need to register
1704-
setReadOnly(true);
1705-
try {
1706-
if (condition.get()) {
1707-
return () -> {};
1708-
}
1709-
} finally {
1710-
setReadOnly(false);
1711-
}
1712-
1713-
ConditionWatcher watcher = new ConditionWatcher(condition);
1714-
conditionWatchers.add(watcher);
1715-
return () -> watcher.canceled = true;
1716-
}
1717-
1718-
/**
1719-
* Evaluates all condition watchers and removes those that return true. Watchers that are
1720-
* satisfied are removed from the list.
1721-
*
1722-
* <p>Note: If a condition throws an exception, it will propagate and fail the workflow task.
1723-
* Callers should handle exceptions within their condition supplier.
1724-
*
1725-
* @return true if any condition was satisfied (indicating progress was made)
1726-
*/
1727-
private boolean evaluateConditionWatchers() {
1728-
boolean anyMatched = false;
1729-
Iterator<ConditionWatcher> it = conditionWatchers.iterator();
1730-
while (it.hasNext()) {
1731-
ConditionWatcher watcher = it.next();
1732-
if (watcher.canceled) {
1733-
it.remove();
1734-
continue;
1735-
}
1736-
1737-
// We must set read-only mode here because the condition is evaluated from the runner
1738-
// thread, not a workflow thread.
1739-
setReadOnly(true);
1740-
boolean satisfied;
1741-
try {
1742-
satisfied = watcher.condition.get();
1743-
} finally {
1744-
setReadOnly(false);
1745-
}
1746-
1747-
if (satisfied) {
1748-
it.remove();
1749-
anyMatched = true;
1750-
}
1751-
}
1752-
return anyMatched;
1753-
}
1754-
1755-
/**
1756-
* Holds a condition for async await functionality. The condition is evaluated at the end of each
1757-
* event loop iteration and must handle promise completion in its body before returning true.
1758-
*/
1759-
private static class ConditionWatcher {
1760-
final Supplier<Boolean> condition;
1761-
boolean canceled;
1762-
1763-
ConditionWatcher(Supplier<Boolean> condition) {
1764-
this.condition = condition;
1765-
}
1766-
}
17671674
}

temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -963,80 +963,4 @@ public void testSupplierCalledMultipleWithoutCaching() {
963963
});
964964
d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS);
965965
}
966-
967-
/**
968-
* Test that beforeThreadsWakeUp callback is invoked BEFORE threads run. The callback sets a value
969-
* that the thread reads, proving the callback ran first.
970-
*/
971-
@Test
972-
public void testBeforeThreadsWakeUpCallbackInvokedBeforeThreads() {
973-
AtomicBoolean valueSetByCallback = new AtomicBoolean(false);
974-
AtomicBoolean threadSawValue = new AtomicBoolean(false);
975-
976-
DeterministicRunnerImpl d =
977-
new DeterministicRunnerImpl(
978-
threadPool::submit,
979-
DummySyncWorkflowContext.newDummySyncWorkflowContext(),
980-
() -> {
981-
// Thread checks if callback set the value
982-
threadSawValue.set(valueSetByCallback.get());
983-
status = "done";
984-
},
985-
null,
986-
() -> {
987-
// Callback sets value before threads run
988-
valueSetByCallback.set(true);
989-
return false;
990-
});
991-
992-
d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS);
993-
assertTrue(d.isDone());
994-
assertTrue("Callback should set value before thread runs", valueSetByCallback.get());
995-
assertTrue("Thread should see value set by callback", threadSawValue.get());
996-
}
997-
998-
/**
999-
* Test that when beforeThreadsWakeUp returns true (progress made), the loop continues and threads
1000-
* run again. The callback can return true multiple times when notifying multiple conditions.
1001-
*/
1002-
@Test
1003-
public void testBeforeThreadsWakeUpProgressContinuesLoop() {
1004-
AtomicBoolean shouldUnblock1 = new AtomicBoolean(false);
1005-
AtomicBoolean shouldUnblock2 = new AtomicBoolean(false);
1006-
AtomicInteger trueCount = new AtomicInteger(0);
1007-
1008-
DeterministicRunnerImpl d =
1009-
new DeterministicRunnerImpl(
1010-
threadPool::submit,
1011-
DummySyncWorkflowContext.newDummySyncWorkflowContext(),
1012-
() -> {
1013-
status = "waiting1";
1014-
WorkflowThread.await("wait1", shouldUnblock1::get);
1015-
status = "waiting2";
1016-
WorkflowThread.await("wait2", shouldUnblock2::get);
1017-
status = "done";
1018-
},
1019-
null,
1020-
() -> {
1021-
// Callback can return true multiple times - once for each condition it unblocks
1022-
if (status.equals("waiting1") && !shouldUnblock1.get()) {
1023-
shouldUnblock1.set(true);
1024-
trueCount.incrementAndGet();
1025-
return true;
1026-
}
1027-
if (status.equals("waiting2") && !shouldUnblock2.get()) {
1028-
shouldUnblock2.set(true);
1029-
trueCount.incrementAndGet();
1030-
return true;
1031-
}
1032-
return false;
1033-
});
1034-
1035-
// Single runUntilAllBlocked: callback returns true twice (once per condition),
1036-
// thread advances through both waits to completion
1037-
d.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS);
1038-
assertEquals("done", status);
1039-
assertTrue(d.isDone());
1040-
assertEquals("Callback should return true twice (once per condition)", 2, trueCount.get());
1041-
}
1042966
}

0 commit comments

Comments
 (0)