Skip to content

Commit d8ee18a

Browse files
committed
feat(TaskQueue): allow adding task to queue wihout waiting for completion
1 parent c66bb81 commit d8ee18a

5 files changed

Lines changed: 173 additions & 6 deletions

File tree

Sources/AsyncObjects/TaskQueue.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,58 @@ public actor TaskQueue: AsyncObject {
437437
}
438438
}
439439

440+
/// Adds the given throwing operation to queue to be executed asynchronously
441+
/// based on the priority and flags.
442+
///
443+
/// Immediately runs the provided operation if queue isn't blocked by any task,
444+
/// otherwise adds operation to queue to be executed later.
445+
///
446+
/// - Parameters:
447+
/// - priority: The priority with which operation executed. Pass `nil` to use the priority
448+
/// from execution context(`Task.currentPriority`).
449+
/// - flags: Additional attributes to apply when executing the operation.
450+
/// For a list of possible values, see ``Flags``.
451+
/// - operation: The throwing operation to perform.
452+
public nonisolated func addTask<T: Sendable>(
453+
priority: TaskPriority? = nil,
454+
flags: Flags = [],
455+
operation: @Sendable @escaping () async throws -> T
456+
) {
457+
Task {
458+
try await exec(
459+
priority: priority,
460+
flags: flags,
461+
operation: operation
462+
)
463+
}
464+
}
465+
466+
/// Adds the given non-throwing operation to queue to be executed asynchronously
467+
/// based on the priority and flags.
468+
///
469+
/// Immediately runs the provided operation if queue isn't blocked by any task,
470+
/// otherwise adds operation to queue to be executed later.
471+
///
472+
/// - Parameters:
473+
/// - priority: The priority with which operation executed. Pass `nil` to use the priority
474+
/// from execution context(`Task.currentPriority`).
475+
/// - flags: Additional attributes to apply when executing the operation.
476+
/// For a list of possible values, see ``Flags``.
477+
/// - operation: The non-throwing operation to perform.
478+
public nonisolated func addTask<T: Sendable>(
479+
priority: TaskPriority? = nil,
480+
flags: Flags = [],
481+
operation: @Sendable @escaping () async -> T
482+
) {
483+
Task {
484+
await exec(
485+
priority: priority,
486+
flags: flags,
487+
operation: operation
488+
)
489+
}
490+
}
491+
440492
/// Signalling on queue does nothing.
441493
/// Only added to satisfy ``AsyncObject`` requirements.
442494
public func signal() {

Tests/AsyncObjectsTests/AsyncCountdownEventTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class AsyncCountdownEventTests: XCTestCase {
5959
let event = AsyncCountdownEvent(until: 3)
6060
await event.increment(by: 10)
6161
Self.signalCountdownEvent(event, times: 10)
62-
await checkExecInterval(durationInRange: 3.5..<4) {
62+
await Self.checkExecInterval(durationInRange: 3.5..<4) {
6363
await event.wait()
6464
}
6565
}
@@ -78,7 +78,7 @@ class AsyncCountdownEventTests: XCTestCase {
7878
let event = AsyncCountdownEvent(until: 3, initial: 2)
7979
await event.increment(by: 10)
8080
Self.signalCountdownEvent(event, times: 10)
81-
await checkExecInterval(durationInRange: 4.5..<5) {
81+
await Self.checkExecInterval(durationInRange: 4.5..<5) {
8282
await event.wait()
8383
}
8484
}
@@ -154,7 +154,7 @@ class AsyncCountdownEventTests: XCTestCase {
154154
await event.reset(to: 2)
155155
}
156156
Self.signalCountdownEvent(event, times: 10)
157-
await checkExecInterval(durationInRange: 2.5...3.1) {
157+
await Self.checkExecInterval(durationInRange: 2.5...3.1) {
158158
await event.wait()
159159
}
160160
}

Tests/AsyncObjectsTests/TaskOperationTests.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ class TaskOperationTests: XCTestCase {
127127
(try? await Self.sleep(seconds: 3)) != nil
128128
}
129129
operation.signal()
130-
await checkExecInterval(durationInRange: ...3, for: operation.wait)
130+
await Self.checkExecInterval(
131+
durationInRange: ...3,
132+
for: operation.wait
133+
)
131134
}
132135

133136
func testTaskOperationAsyncWaitTimeout() async throws {

Tests/AsyncObjectsTests/TaskQueueTests.swift

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,4 +602,116 @@ class TaskQueueTests: XCTestCase {
602602
}
603603
}
604604
}
605+
606+
func testCancellableAndNonCancellableTasksOnSingleQueue() async throws {
607+
let queue = TaskQueue()
608+
await Self.checkExecInterval(durationInSeconds: 0) {
609+
await withThrowingTaskGroup(of: Void.self) { group in
610+
group.addTask {
611+
try await queue.exec {
612+
try await Self.sleep(seconds: 2)
613+
}
614+
}
615+
group.addTask {
616+
try await queue.exec {
617+
try await Self.sleep(seconds: 3)
618+
}
619+
}
620+
group.addTask {
621+
await queue.exec {
622+
do {
623+
try await Self.sleep(seconds: 4)
624+
XCTFail("Unexpected task progression")
625+
} catch {
626+
XCTAssertTrue(
627+
type(of: error) == CancellationError.self
628+
)
629+
}
630+
}
631+
}
632+
group.cancelAll()
633+
}
634+
}
635+
}
636+
637+
func testCancellableAndNonCancellableTasksOnSingleQueueWithBarrier()
638+
async throws
639+
{
640+
let queue = TaskQueue()
641+
try await Self.checkExecInterval(durationInSeconds: 3) {
642+
try await withThrowingTaskGroup(of: Void.self) { group in
643+
group.addTask {
644+
try await queue.exec {
645+
try await Self.sleep(seconds: 1)
646+
}
647+
}
648+
group.addTask {
649+
try await queue.exec {
650+
try await Self.sleep(seconds: 2)
651+
}
652+
}
653+
group.addTask {
654+
try await queue.exec {
655+
try await Self.sleep(seconds: 3)
656+
}
657+
}
658+
// Make sure previous tasks started
659+
try await Self.sleep(forSeconds: 0.01)
660+
await group.addTaskAndStart {
661+
try await queue.exec(flags: .barrier) {
662+
try await Self.sleep(seconds: 2)
663+
}
664+
}
665+
// Make sure previous tasks started
666+
try await Self.sleep(forSeconds: 0.01)
667+
group.addTask {
668+
try await queue.exec {
669+
try await Self.sleep(seconds: 2)
670+
}
671+
}
672+
group.addTask {
673+
await queue.exec {
674+
do {
675+
try await Self.sleep(seconds: 3)
676+
XCTFail("Unexpected task progression")
677+
} catch {
678+
XCTAssertTrue(
679+
type(of: error) == CancellationError.self
680+
)
681+
}
682+
}
683+
}
684+
group.addTask {
685+
await queue.exec {
686+
do {
687+
try await Self.sleep(seconds: 4)
688+
XCTFail("Unexpected task progression")
689+
} catch {
690+
XCTAssertTrue(
691+
type(of: error) == CancellationError.self
692+
)
693+
}
694+
}
695+
}
696+
697+
for _ in 0..<3 { try await group.next() }
698+
group.cancelAll()
699+
}
700+
}
701+
}
702+
703+
func testTaskExecutionWithJustAddingTasks() async throws {
704+
let queue = TaskQueue()
705+
queue.addTask(flags: .barrier) {
706+
try await Self.sleep(seconds: 2)
707+
}
708+
// Make sure previous tasks started
709+
try await Self.sleep(forSeconds: 0.01)
710+
await Self.checkExecInterval(durationInSeconds: 2) {
711+
queue.addTask {
712+
try! await Self.sleep(seconds: 2)
713+
}
714+
await queue.wait()
715+
}
716+
}
605717
}

Tests/AsyncObjectsTests/XCTestCase.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ extension XCTestCase {
3535
)
3636
}
3737

38-
func checkExecInterval<R: RangeExpression>(
38+
static func checkExecInterval<R: RangeExpression>(
3939
durationInRange range: R,
4040
for task: () async throws -> Void
4141
) async rethrows where R.Bound == Int {
@@ -51,7 +51,7 @@ extension XCTestCase {
5151
)
5252
}
5353

54-
func checkExecInterval<R: RangeExpression>(
54+
static func checkExecInterval<R: RangeExpression>(
5555
durationInRange range: R,
5656
for task: () async throws -> Void
5757
) async rethrows where R.Bound == Double {

0 commit comments

Comments
 (0)