Skip to content

Commit f28ee66

Browse files
committed
perf(TaskOperation)!: use platform lock primitive instead of DispatchQueue for synchronization
1 parent d8ee18a commit f28ee66

16 files changed

Lines changed: 758 additions & 457 deletions

AsyncObjects.xcodeproj/project.pbxproj

Lines changed: 395 additions & 387 deletions
Large diffs are not rendered by default.

Sources/AsyncObjects/CancellationSource.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public extension Task {
183183
priority: TaskPriority? = nil,
184184
cancellationSource: CancellationSource,
185185
operation: @escaping @Sendable () async throws -> Success
186-
) rethrows where Failure == Error {
186+
) where Failure == Error {
187187
self.init(priority: priority) {
188188
let task = Self.init(priority: priority, operation: operation)
189189
await cancellationSource.register(task: task)
@@ -235,7 +235,7 @@ public extension Task {
235235
priority: TaskPriority? = nil,
236236
cancellationSource: CancellationSource,
237237
operation: @escaping @Sendable () async throws -> Success
238-
) rethrows -> Self where Failure == Error {
238+
) -> Self where Failure == Error {
239239
return Task.detached(priority: priority) {
240240
let task = Self.init(priority: priority, operation: operation)
241241
await cancellationSource.register(task: task)
@@ -280,7 +280,7 @@ public extension Task {
280280
priority: TaskPriority? = nil,
281281
cancellationSource: CancellationSource,
282282
operation: @escaping @Sendable () async throws -> Success
283-
) async rethrows where Failure == Error {
283+
) async where Failure == Error {
284284
self.init(priority: priority, operation: operation)
285285
await cancellationSource.register(task: self)
286286
}
@@ -323,7 +323,7 @@ public extension Task {
323323
priority: TaskPriority? = nil,
324324
cancellationSource: CancellationSource,
325325
operation: @escaping @Sendable () async throws -> Success
326-
) async rethrows -> Self where Failure == Error {
326+
) async -> Self where Failure == Error {
327327
let task = Task.detached(priority: priority, operation: operation)
328328
await cancellationSource.register(task: task)
329329
return task

Sources/AsyncObjects/Locker.swift

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#if canImport(Darwin)
2+
@_implementationOnly import Darwin
3+
#elseif canImport(Glibc)
4+
@_implementationOnly import Glibc
5+
#elseif canImport(WinSDK)
6+
@_implementationOnly import WinSDK
7+
#endif
8+
9+
/// A synchronization object that can be used to provide exclusive access to threads.
10+
///
11+
/// The values stored in the lock should be considered opaque and implementation defined,
12+
/// they contain thread ownership information that the system may use to attempt to resolve priority inversions.
13+
/// This lock must be unlocked from the same thread that locked it,
14+
/// attempts to unlock from a different thread will cause an assertion aborting the process.
15+
/// This lock must not be accessed from multiple processes or threads via shared or multiply-mapped memory,
16+
/// the lock implementation relies on the address of the lock value and owning process.
17+
public final class Locker: Equatable, Hashable, Sendable {
18+
#if canImport(Darwin)
19+
/// A type representing data for an unfair lock.
20+
typealias Primitive = os_unfair_lock
21+
#elseif canImport(Glibc)
22+
/// A type representing a MUTual EXclusion object.
23+
typealias Primitive = pthread_mutex_t
24+
#elseif canImport(WinSDK)
25+
/// A type representing a slim reader/writer (SRW) lock.
26+
typealias Primitive = SRWLOCK
27+
#endif
28+
29+
/// Pointer type pointing to platform dependent lock primitive.
30+
typealias PlatformLock = UnsafeMutablePointer<Primitive>
31+
/// Pointer to platform dependent lock primitive.
32+
let platformLock: PlatformLock
33+
34+
/// Creates lock object with the provided pointer to platform dependent lock primitive.
35+
///
36+
/// - Parameter platformLock: Pointer to platform dependent lock primitive.
37+
/// - Returns: The newly created lock object.
38+
init(withLock platformLock: PlatformLock) {
39+
self.platformLock = platformLock
40+
}
41+
42+
/// Allocates and initializes platform dependent lock primitive.
43+
///
44+
/// - Returns: The newly created lock object.
45+
public init() {
46+
let platformLock = PlatformLock.allocate(capacity: 1)
47+
#if canImport(Darwin)
48+
platformLock.initialize(to: os_unfair_lock())
49+
#elseif canImport(Glibc)
50+
pthread_mutex_init(platformLock, nil)
51+
#elseif canImport(WinSDK)
52+
InitializeSRWLock(platformLock)
53+
#endif
54+
self.platformLock = platformLock
55+
}
56+
57+
deinit {
58+
#if canImport(Glibc)
59+
pthread_mutex_destroy(platformLock)
60+
#endif
61+
platformLock.deinitialize(count: 1)
62+
}
63+
64+
/// Acquires exclusive lock.
65+
///
66+
/// If a thread has already acquired lock and hasn't released lock yet,
67+
/// other threads will wait for lock to be released and then acquire lock
68+
/// in order of their request.
69+
public func lock() {
70+
#if canImport(Darwin)
71+
os_unfair_lock_lock(platformLock)
72+
#elseif canImport(Glibc)
73+
pthread_mutex_lock(platformLock)
74+
#elseif canImport(WinSDK)
75+
AcquireSRWLockExclusive(platformLock)
76+
#endif
77+
}
78+
79+
/// Releases exclusive lock.
80+
///
81+
/// A lock must be unlocked only from the same thread in which it was locked.
82+
/// Attempting to unlock from a different thread causes a runtime error.
83+
public func unlock() {
84+
#if canImport(Darwin)
85+
os_unfair_lock_unlock(platformLock)
86+
#elseif canImport(Glibc)
87+
pthread_mutex_unlock(platformLock)
88+
#elseif canImport(WinSDK)
89+
ReleaseSRWLockExclusive(platformLock)
90+
#endif
91+
}
92+
93+
/// Performs a critical piece of work synchronously after acquiring the lock
94+
/// and releases lock when task completes.
95+
///
96+
/// Use this to perform critical tasks or provide access to critical resource
97+
/// that require exclusivity among other concurrent tasks.
98+
///
99+
/// - Parameter critical: The critical task to perform.
100+
/// - Returns: The result from the critical task.
101+
@discardableResult
102+
public func perform<R>(_ critical: () throws -> R) rethrows -> R {
103+
lock()
104+
defer { unlock() }
105+
return try critical()
106+
}
107+
108+
/// Returns a Boolean value indicating whether two locks are equal.
109+
///
110+
/// Checks if two lock objects point to the same platform dependent lock primitive.
111+
///
112+
/// - Parameters:
113+
/// - lhs: A lock to compare.
114+
/// - rhs: Another lock to compare.
115+
///
116+
/// - Returns: If the lock objects compared are equal.
117+
public static func == (lhs: Locker, rhs: Locker) -> Bool {
118+
return lhs.platformLock == rhs.platformLock
119+
}
120+
121+
/// Hashes the pointer to platform dependent lock primitive
122+
/// by feeding into the given hasher.
123+
///
124+
/// - Parameter hasher: The hasher to use when combining
125+
/// the components of this instance.
126+
public func hash(into hasher: inout Hasher) {
127+
hasher.combine(platformLock)
128+
}
129+
}

Sources/AsyncObjects/TaskOperation.swift

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ import Dispatch
1414
public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
1515
@unchecked Sendable
1616
{
17-
/// The dispatch queue used to synchronize data access and modifications.
17+
/// The platform dependent lock used to
18+
/// synchronize data access and modifications.
1819
@usableFromInline
19-
let propQueue: DispatchQueue
20+
let locker: Locker
2021
/// The asynchronous action to perform as part of the operation..
2122
private let underlyingAction: @Sendable () async throws -> R
2223
/// The top-level task that executes asynchronous action provided
@@ -35,33 +36,35 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
3536
public override var isCancelled: Bool { execTask?.isCancelled ?? false }
3637

3738
/// Private store for boolean value indicating whether the operation is currently executing.
38-
private var _isExecuting: Bool = false
39+
@usableFromInline
40+
var _isExecuting: Bool = false
3941
/// A Boolean value indicating whether the operation is currently executing.
4042
///
4143
/// The value of this property is true if the operation is currently executing
4244
/// provided asynchronous operation or false if it is not.
4345
public override internal(set) var isExecuting: Bool {
44-
get { propQueue.sync { _isExecuting } }
46+
get { locker.perform { _isExecuting } }
4547
@usableFromInline
4648
set {
4749
willChangeValue(forKey: "isExecuting")
48-
propQueue.sync(flags: [.barrier]) { _isExecuting = newValue }
50+
locker.perform { _isExecuting = newValue }
4951
didChangeValue(forKey: "isExecuting")
5052
}
5153
}
5254

5355
/// Private store for boolean value indicating whether the operation has finished executing its task.
54-
private var _isFinished: Bool = false
56+
@usableFromInline
57+
var _isFinished: Bool = false
5558
/// A Boolean value indicating whether the operation has finished executing its task.
5659
///
5760
/// The value of this property is true if the operation is finished executing or cancelled
5861
/// provided asynchronous operation or false if it is not.
5962
public override internal(set) var isFinished: Bool {
60-
get { propQueue.sync { _isFinished } }
63+
get { locker.perform { _isFinished } }
6164
@usableFromInline
6265
set {
6366
willChangeValue(forKey: "isFinished")
64-
propQueue.sync(flags: [.barrier]) {
67+
locker.perform {
6568
_isFinished = newValue
6669
guard newValue, !continuations.isEmpty else { return }
6770
continuations.forEach { $0.value.resume() }
@@ -79,44 +82,30 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
7982
get async { (await execTask?.result) ?? .failure(CancellationError()) }
8083
}
8184

82-
/// Creates a new operation that executes the provided throwing asynchronous task.
85+
/// Creates a new operation that executes the provided asynchronous task.
8386
///
84-
/// The provided dispatch queue is used to synchronize operation property access and modifications
85-
/// and prevent data races.
87+
/// The operation execution only starts after ``start()`` is invoked.
88+
/// Operation completes when underlying asynchronous task finishes.
89+
/// The provided lock is used to synchronize operation property access and modifications
90+
/// to prevent data races.
8691
///
8792
/// - Parameters:
88-
/// - queue: The dispatch queue to be used to synchronize data access and modifications.
89-
/// - operation: The throwing asynchronous operation to execute.
93+
/// - locker: The locker to use to synchronize property read and mutations.
94+
/// New lock object is created in case none provided.
95+
/// - operation: The asynchronous operation to execute.
9096
///
9197
/// - Returns: The newly created asynchronous operation.
9298
public init(
93-
queue: DispatchQueue,
99+
synchronizedWith locker: Locker = .init(),
94100
operation: @escaping @Sendable () async throws -> R
95101
) {
96-
self.propQueue = queue
102+
self.locker = locker
97103
self.underlyingAction = operation
98104
super.init()
99105
}
100106

101-
deinit { self.continuations.forEach { $0.value.cancel() } }
102-
103-
/// Creates a new operation that executes the provided non-throwing asynchronous task.
104-
///
105-
/// The provided dispatch queue is used to synchronize operation property access and modifications
106-
/// and prevent data races.
107-
///
108-
/// - Parameters:
109-
/// - queue: The dispatch queue to be used to synchronize data access and modifications.
110-
/// - operation: The non-throwing asynchronous operation to execute.
111-
///
112-
/// - Returns: The newly created asynchronous operation.
113-
public init(
114-
queue: DispatchQueue,
115-
operation: @escaping @Sendable () async -> R
116-
) {
117-
self.propQueue = queue
118-
self.underlyingAction = operation
119-
super.init()
107+
deinit {
108+
locker.perform { self.continuations.forEach { $0.value.cancel() } }
120109
}
121110

122111
/// Begins the execution of the operation.
@@ -138,9 +127,11 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
138127
public override func main() {
139128
guard isExecuting, execTask == nil else { return }
140129
execTask = Task { [weak self] in
141-
guard let self = self else { throw CancellationError() }
142-
defer { self._finish() }
143-
let result = try await underlyingAction()
130+
guard
131+
let action = self?.underlyingAction
132+
else { throw CancellationError() }
133+
defer { self?._finish() }
134+
let result = try await action()
144135
return result
145136
}
146137
}
@@ -185,8 +176,8 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
185176
_ continuation: Continuation,
186177
withKey key: UUID
187178
) {
188-
propQueue.sync(flags: [.barrier]) {
189-
if isFinished { continuation.resume(); return }
179+
locker.perform {
180+
if _isFinished { continuation.resume(); return }
190181
continuations[key] = continuation
191182
}
192183
}
@@ -197,7 +188,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
197188
/// - Parameter key: The key in the map.
198189
@inlinable
199190
func _removeContinuation(withKey key: UUID) {
200-
propQueue.sync(flags: [.barrier]) {
191+
locker.perform {
201192
let continuation = continuations.removeValue(forKey: key)
202193
continuation?.cancel()
203194
}

Sources/AsyncObjects/TaskQueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import OrderedCollections
33

44
/// An object that acts as a concurrent queue executing submitted tasks concurrently.
55
///
6-
/// You can use the ``exec(priority:flags:operation:)-92nww``
6+
/// You can use the ``exec(priority:flags:operation:)-2ll3k``
77
/// or its non-throwing/non-cancellable version to run tasks concurrently.
88
/// Additionally, you can provide priority of task and ``Flags``
99
/// to customize execution of submitted operation.

Tests/AsyncObjectsTests/AsyncCountdownEventTests.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,16 @@ class AsyncCountdownEventTests: XCTestCase {
158158
await event.wait()
159159
}
160160
}
161+
162+
func testDeinit() async throws {
163+
let event = AsyncCountdownEvent(until: 0, initial: 1)
164+
Task.detached {
165+
try await Self.sleep(seconds: 1)
166+
await event.signal()
167+
}
168+
await event.wait()
169+
self.addTeardownBlock { [weak event] in
170+
XCTAssertNil(event)
171+
}
172+
}
161173
}

Tests/AsyncObjectsTests/AsyncEventTests.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,16 @@ class AsyncEventTests: XCTestCase {
7373
}
7474
XCTAssertEqual(result, .success)
7575
}
76+
77+
func testDeinit() async throws {
78+
let event = AsyncEvent(signaledInitially: false)
79+
Task.detached {
80+
try await Self.sleep(seconds: 1)
81+
await event.signal()
82+
}
83+
await event.wait()
84+
self.addTeardownBlock { [weak event] in
85+
XCTAssertNil(event)
86+
}
87+
}
7688
}

Tests/AsyncObjectsTests/AsyncObjectTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class AsyncObjectTests: XCTestCase {
3333
func testMultipleObjectWaitMultiple() async throws {
3434
let event = AsyncEvent(signaledInitially: false)
3535
let mutex = AsyncSemaphore()
36-
let op = TaskOperation(queue: .global(qos: .background)) {
36+
let op = TaskOperation {
3737
try await Self.sleep(seconds: 3)
3838
}
3939
Task.detached {
@@ -80,7 +80,7 @@ class AsyncObjectTests: XCTestCase {
8080
var result: TaskTimeoutResult = .success
8181
let event = AsyncEvent(signaledInitially: false)
8282
let mutex = AsyncSemaphore()
83-
let op = TaskOperation(queue: .global(qos: .background)) {
83+
let op = TaskOperation {
8484
try await Self.sleep(seconds: 4)
8585
}
8686
Task.detached {

Tests/AsyncObjectsTests/AsyncSemaphoreTests.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,18 @@ class AsyncSemaphoreTests: XCTestCase {
168168
}
169169
XCTAssertEqual(data.items.count, 10)
170170
}
171+
172+
func testDeinit() async throws {
173+
let semaphore = AsyncSemaphore()
174+
Task.detached {
175+
try await Self.sleep(seconds: 1)
176+
await semaphore.signal()
177+
}
178+
await semaphore.wait()
179+
self.addTeardownBlock { [weak semaphore] in
180+
XCTAssertNil(semaphore)
181+
}
182+
}
171183
}
172184

173185
actor TaskTimeoutStore {

0 commit comments

Comments
 (0)