Skip to content

Commit 0432022

Browse files
committed
fix(promise-queue): reject newest submissions when queue is full
`maxQueueLength` overflow dropped the HEAD of the queue — the task that had been waiting longest — and rejected that caller's promise. FIFO queues conventionally reject at the tail (drop-newest) so patient submitters aren't punished by a burst of late arrivals. `p-queue` and bottleneck both default to drop-newest; the old drop-oldest behavior was surprising and silently starved early callers under pressure. Also documents that overflow rejections arrive via the returned promise and must be handled or they become unhandled rejections.
1 parent 78631ea commit 0432022

2 files changed

Lines changed: 34 additions & 24 deletions

File tree

src/promise-queue.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
/**
22
* @fileoverview Bounded concurrency promise queue.
33
* Exports the `PromiseQueue` class, which limits how many async tasks run
4-
* simultaneously, supports an optional max queue length (dropping the oldest
5-
* pending task when exceeded), and exposes an idle-wait helper.
4+
* simultaneously, supports an optional max queue length (new tasks beyond
5+
* the cap are rejected with "Task dropped: queue length exceeded"), and
6+
* exposes an idle-wait helper.
67
*/
78

89
type QueuedTask<T> = {
@@ -22,7 +23,10 @@ export class PromiseQueue {
2223
/**
2324
* Creates a new PromiseQueue
2425
* @param maxConcurrency - Maximum number of promises that can run concurrently
25-
* @param maxQueueLength - Maximum queue size (older tasks are dropped if exceeded)
26+
* @param maxQueueLength - Maximum queue size; submissions past the cap
27+
* reject with "Task dropped: queue length exceeded" instead of evicting
28+
* a caller that has been waiting patiently. Callers must handle this
29+
* rejection or they'll see an unhandled rejection.
2630
*/
2731
constructor(maxConcurrency: number, maxQueueLength?: number | undefined) {
2832
this.maxConcurrency = maxConcurrency
@@ -35,23 +39,25 @@ export class PromiseQueue {
3539
/**
3640
* Add a task to the queue
3741
* @param fn - Async function to execute
38-
* @returns Promise that resolves with the function's result
42+
* @returns Promise that resolves with the function's result, or rejects
43+
* with "Task dropped: queue length exceeded" if the queue is full.
3944
*/
4045
async add<T>(fn: () => Promise<T>): Promise<T> {
4146
return await new Promise<T>((resolve, reject) => {
42-
const task: QueuedTask<T> = { fn, resolve, reject }
43-
47+
// Reject the newcomer rather than evicting an earlier-submitted task.
48+
// FIFO fairness: the caller who waited longest gets served, not the
49+
// caller who arrived last. Previously this dropped the queue head,
50+
// which punished patient callers and violated typical
51+
// bounded-queue semantics.
4452
if (
4553
this.maxQueueLength !== undefined &&
4654
this.queue.length >= this.maxQueueLength
4755
) {
48-
// Drop oldest task to prevent memory buildup
49-
const droppedTask = this.queue.shift()
50-
if (droppedTask) {
51-
droppedTask.reject(new Error('Task dropped: queue length exceeded'))
52-
}
56+
reject(new Error('Task dropped: queue length exceeded'))
57+
return
5358
}
5459

60+
const task: QueuedTask<T> = { fn, resolve, reject }
5561
this.queue.push(task as QueuedTask<unknown>)
5662
this.runNext()
5763
})

test/unit/promise-queue.test.mts

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -141,40 +141,44 @@ describe('PromiseQueue', () => {
141141
})
142142

143143
describe('maxQueueLength', () => {
144-
it('should drop oldest task when queue exceeds max length', async () => {
144+
it('should reject the newest submission when queue is full (drop-newest, FIFO-fair)', async () => {
145145
const queue = new PromiseQueue(1, 2)
146146
const results: string[] = []
147147
const errors: Error[] = []
148148

149-
// Add 4 tasks - first one runs immediately, next 2 queue, 4th drops oldest queued
149+
// Concurrency 1, queue cap 2. task1 runs immediately, task2+task3
150+
// fill the queue, task4 arrives over the cap and should be rejected —
151+
// earlier submitters (task2, task3) keep their slots.
150152
const tasks = [
151153
queue.add(async () => {
152154
await delay(50)
153155
results.push('task1')
154156
return 'task1'
155157
}),
156-
queue
157-
.add(async () => {
158-
results.push('task2')
159-
return 'task2'
160-
})
161-
.catch((e: Error) => errors.push(e)),
162158
queue.add(async () => {
163-
results.push('task3')
164-
return 'task3'
159+
results.push('task2')
160+
return 'task2'
165161
}),
166162
queue.add(async () => {
167-
results.push('task4')
168-
return 'task4'
163+
results.push('task3')
164+
return 'task3'
169165
}),
166+
queue
167+
.add(async () => {
168+
results.push('task4')
169+
return 'task4'
170+
})
171+
.catch((e: Error) => errors.push(e)),
170172
]
171173

172174
await Promise.all(tasks.map(t => t.catch(() => {})))
173175

174176
expect(errors.length).toBe(1)
175177
expect(errors[0]?.message).toBe('Task dropped: queue length exceeded')
176178
expect(results).toContain('task1')
177-
expect(results).not.toContain('task2') // Dropped
179+
expect(results).toContain('task2')
180+
expect(results).toContain('task3')
181+
expect(results).not.toContain('task4')
178182
})
179183

180184
it('should work without dropping tasks when under limit', async () => {

0 commit comments

Comments
 (0)