Skip to content

Commit 61da2da

Browse files
committed
feat: add AbortSignal support to all blocking operations
1 parent d35ebe9 commit 61da2da

15 files changed

Lines changed: 310 additions & 92 deletions

CLAUDE.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ Small utilities for Rust/Tokio-like primitives in TypeScript. Zero overhead, no
1111
- Use single-line conventional commit messages. No co-authors.
1212
- Example: `git commit -m "feat(sync): add broadcast channel"`
1313

14+
## Cancellation: AbortSignal vs Drop
15+
16+
In Rust, cancellation is implicit: dropping a future cancels it. In JavaScript, promises cannot be "dropped"; they run to completion unless the underlying operation checks an `AbortSignal`. Any function in antiox that wraps or races user-provided async work must accept an optional `AbortSignal` so callers can cancel it, mirroring how Rust callers would simply drop the future.
17+
1418
## API Reference
1519

1620
Before implementing or modifying any module, always look up the corresponding Tokio or Rust std docs first to verify correct type names, method signatures, and semantics:
@@ -58,7 +62,7 @@ pnpm test # Run tests
5862

5963
## Documentation
6064

61-
- Keep `README.md` up to date when adding or changing modules.
65+
- Keep `README.md` up to date when adding or changing modules. Code examples in README must reflect the current API signatures.
6266
- Keep `COMPATIBILITY.md` up to date when adding, removing, or modifying any API. Every exported function, class, and method must be tracked against its Rust equivalent. Mark new items as implemented, and document why anything is intentionally skipped.
6367

6468
## Code Style

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,9 @@ handle.abort();
458458

459459
// JoinSet for managing multiple tasks
460460
const set = new JoinSet<number>();
461-
set.spawn(async () => 1);
462-
set.spawn(async () => 2);
463-
set.spawn(async () => 3);
461+
set.spawn(async (signal) => 1);
462+
set.spawn(async (signal) => 2);
463+
set.spawn(async (signal) => 3);
464464

465465
for await (const result of set) {
466466
console.log(result); // 1, 2, 3 (in completion order)
@@ -488,7 +488,9 @@ try {
488488
if (e instanceof TimeoutError) console.log("timed out");
489489
}
490490

491-
for await (const tick of interval(1000)) {
491+
// All functions accept an optional AbortSignal for cancellation
492+
const controller = new AbortController();
493+
for await (const tick of interval(1000, controller.signal)) {
492494
console.log(`Tick ${tick}`);
493495
if (tick >= 4) break;
494496
}

src/sync/barrier.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export class BarrierWaitResult {
1414

1515
interface Waiter {
1616
resolve: (result: BarrierWaitResult) => void;
17+
aborted?: boolean;
1718
}
1819

1920
// Reusable barrier: n tasks synchronize, then the barrier resets for the next generation.
@@ -29,14 +30,16 @@ export class Barrier {
2930
this.#n = n;
3031
}
3132

32-
wait(): Promise<BarrierWaitResult> {
33+
wait(signal?: AbortSignal): Promise<BarrierWaitResult> {
34+
signal?.throwIfAborted();
3335
this.#count++;
3436

3537
if (this.#count === this.#n) {
3638
const result = new BarrierWaitResult(true);
3739

3840
while (!this.#waiters.isEmpty()) {
39-
this.#waiters.shift()!.resolve(new BarrierWaitResult(false));
41+
const w = this.#waiters.shift()!;
42+
if (!w.aborted) w.resolve(new BarrierWaitResult(false));
4043
}
4144

4245
this.#count = 0;
@@ -45,8 +48,20 @@ export class Barrier {
4548
return Promise.resolve(result);
4649
}
4750

48-
return new Promise<BarrierWaitResult>((resolve) => {
49-
this.#waiters.push({ resolve });
51+
return new Promise<BarrierWaitResult>((resolve, reject) => {
52+
const waiter: Waiter = { resolve };
53+
this.#waiters.push(waiter);
54+
if (signal) {
55+
const gen = this.#generation;
56+
const onAbort = () => {
57+
waiter.aborted = true;
58+
// Only decrement if still in the same generation
59+
if (this.#generation === gen) this.#count--;
60+
reject(signal.reason);
61+
};
62+
signal.addEventListener("abort", onAbort, { once: true });
63+
waiter.resolve = (v) => { signal.removeEventListener("abort", onAbort); resolve(v); };
64+
}
5065
});
5166
}
5267
}

src/sync/broadcast.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ export class BroadcastReceiver<T> {
126126
this.#cursor = cursor;
127127
}
128128

129-
recv(): Promise<T> {
129+
recv(signal?: AbortSignal): Promise<T> {
130+
signal?.throwIfAborted();
130131
if (this.#cursor < this.#state.writePos - this.#state.capacity) {
131132
const missed = this.#state.writePos - this.#state.capacity - this.#cursor;
132133
this.#cursor = this.#state.writePos - this.#state.capacity;
@@ -145,6 +146,7 @@ export class BroadcastReceiver<T> {
145146
}
146147

147148
return new Promise<T>((resolve, reject) => {
149+
const cursor = this.#cursor;
148150
const waiter: Waiter<T> = {
149151
resolve: (value: T) => {
150152
this.#cursor++;
@@ -153,12 +155,27 @@ export class BroadcastReceiver<T> {
153155
reject,
154156
};
155157

156-
let set = this.#state.waiters.get(this.#cursor);
158+
let set = this.#state.waiters.get(cursor);
157159
if (!set) {
158160
set = new Set();
159-
this.#state.waiters.set(this.#cursor, set);
161+
this.#state.waiters.set(cursor, set);
160162
}
161163
set.add(waiter);
164+
165+
if (signal) {
166+
const onAbort = () => {
167+
const s = this.#state.waiters.get(cursor);
168+
if (s) {
169+
s.delete(waiter);
170+
if (s.size === 0) this.#state.waiters.delete(cursor);
171+
}
172+
reject(signal.reason);
173+
};
174+
signal.addEventListener("abort", onAbort, { once: true });
175+
const origResolve = waiter.resolve;
176+
waiter.resolve = (v) => { signal.removeEventListener("abort", onAbort); origResolve(v); };
177+
waiter.reject = (e) => { signal.removeEventListener("abort", onAbort); reject(e); };
178+
}
162179
});
163180
}
164181

src/sync/cancellation_token.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,26 @@ export class CancellationToken {
2020
return this.#controller.signal.aborted;
2121
}
2222

23-
cancelled(): Promise<void> {
23+
cancelled(signal?: AbortSignal): Promise<void> {
24+
signal?.throwIfAborted();
2425
if (this.#controller.signal.aborted) {
2526
return Promise.resolve();
2627
}
2728

28-
return new Promise<void>((resolve) => {
29-
this.#controller.signal.addEventListener("abort", () => resolve(), { once: true });
29+
return new Promise<void>((resolve, reject) => {
30+
const onCancel = () => {
31+
if (signal) signal.removeEventListener("abort", onAbort!);
32+
resolve();
33+
};
34+
let onAbort: (() => void) | undefined;
35+
if (signal) {
36+
onAbort = () => {
37+
this.#controller.signal.removeEventListener("abort", onCancel);
38+
reject(signal.reason);
39+
};
40+
signal.addEventListener("abort", onAbort, { once: true });
41+
}
42+
this.#controller.signal.addEventListener("abort", onCancel, { once: true });
3043
});
3144
}
3245

0 commit comments

Comments
 (0)