-
Notifications
You must be signed in to change notification settings - Fork 207
Introduce Notification Messages API #1452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
cthielen
merged 3 commits into
swiftlang:main
from
cthielen:pr/notification-center-message
Aug 5, 2025
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
Sources/FoundationEssentials/NotificationCenter/ActorQueueManager.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift.org open source project | ||
| // | ||
| // Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors | ||
| // Licensed under Apache License v2.0 with Runtime Library Exception | ||
| // | ||
| // See https://swift.org/LICENSE.txt for license information | ||
| // See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| #if FOUNDATION_FRAMEWORK | ||
| internal class _NotificationCenterActorQueueManagerNSObjectWrapper: NSObject, @unchecked Sendable {} | ||
| #else | ||
| internal class _NotificationCenterActorQueueManagerNSObjectWrapper: @unchecked Sendable {} | ||
| #endif | ||
|
|
||
| #if FOUNDATION_FRAMEWORK | ||
| @objc(_NotificationCenterActorQueueManager) | ||
| #endif | ||
| internal final class _NotificationCenterActorQueueManager: _NotificationCenterActorQueueManagerNSObjectWrapper, @unchecked Sendable { | ||
| #if !NO_FILESYSTEM | ||
| struct State { | ||
| var buffer = [@Sendable () async -> Void]() | ||
| var continuation: UnsafeContinuation<(@Sendable () async -> Void)?, Never>? | ||
| var isCancelled: Bool = false | ||
|
|
||
| static func waitForWork(_ state: LockedState<State>) async -> (@Sendable () async -> Void)? { | ||
| return await withTaskCancellationHandler { | ||
| return await withUnsafeContinuation { continuation in | ||
| let (work, resumeContinuation) = state.withLock { state -> ((@Sendable () async -> Void)?, Bool) in | ||
| if state.isCancelled { | ||
| return (nil, true) | ||
| } else { | ||
| if state.buffer.isEmpty { | ||
| assert(state.continuation == nil) | ||
| state.continuation = continuation | ||
| return (nil, false) | ||
| } else { | ||
| return (state.buffer.removeFirst(), true) | ||
| } | ||
| } | ||
| } | ||
| if resumeContinuation { | ||
| continuation.resume(returning: work) | ||
| } | ||
| } | ||
| } onCancel: { | ||
| state.withLock { state in | ||
| state.isCancelled = true | ||
| defer { | ||
| state.continuation = nil | ||
| } | ||
| return state.continuation | ||
| }?.resume(returning: nil) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let state: LockedState<State> | ||
| let workerTask: Task<(), Never> | ||
|
|
||
| override init() { | ||
| state = LockedState(initialState: State()) | ||
| workerTask = Task.detached { [state] in | ||
| await withDiscardingTaskGroup { group in | ||
| while let work = await State.waitForWork(state) { | ||
| group.addTask(operation: work) | ||
| } | ||
| } | ||
| } | ||
| super.init() | ||
| } | ||
|
|
||
| deinit { | ||
| workerTask.cancel() | ||
| } | ||
|
|
||
| func enqueue(_ work: @escaping @Sendable () async -> Void) { | ||
| state.withLock { state in | ||
| state.buffer.append(work) | ||
| if let continuation = state.continuation { | ||
| state.continuation = nil | ||
| let item = state.buffer.removeFirst() | ||
| continuation.resume(returning: item) | ||
| } | ||
| } | ||
| } | ||
| #endif | ||
| } | ||
200 changes: 200 additions & 0 deletions
200
Sources/FoundationEssentials/NotificationCenter/AsyncMessage+AsyncSequence.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,200 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift.org open source project | ||
| // | ||
| // Copyright (c) 2025 Apple Inc. and the Swift project authors | ||
| // Licensed under Apache License v2.0 with Runtime Library Exception | ||
| // | ||
| // See https://swift.org/LICENSE.txt for license information | ||
| // See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| #if FOUNDATION_FRAMEWORK | ||
| internal import _ForSwiftFoundation | ||
| internal import CollectionsInternal | ||
| #elseif canImport(DequeModule) | ||
| internal import DequeModule | ||
| #elseif canImport(_FoundationCollections) | ||
| internal import _FoundationCollections | ||
| #endif | ||
| #if canImport(os) | ||
| internal import os.log | ||
| #endif | ||
|
|
||
| @available(FoundationPreview 6.2, *) | ||
| extension NotificationCenter { | ||
| /// Returns an asynchronous sequence of messages produced by this center for a given subject and identifier. | ||
| /// - Parameters: | ||
| /// - subject: The subject to observe. Specify a metatype to observe all values for a given type. | ||
| /// - identifier: An identifier representing a specific message type. | ||
| /// - limit: The maximum number of messages allowed to buffer. | ||
| /// - Returns: An asynchronous sequence of messages produced by this center. | ||
| public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>( | ||
| of subject: Message.Subject, | ||
| for identifier: Identifier, | ||
| bufferSize limit: Int = 10 | ||
| ) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message, Message.Subject: AnyObject { | ||
| return AsyncMessageSequence<Message>(self, subject, limit) | ||
| } | ||
|
|
||
| /// Returns an asynchronous sequence of messages produced by this center for a given subject type and identifier. | ||
| /// - Parameters: | ||
| /// - subject: The metatype to observe all values for a given type. | ||
| /// - identifier: An identifier representing a specific message type. | ||
| /// - limit: The maximum number of messages allowed to buffer. | ||
| /// - Returns: An asynchronous sequence of messages produced by this center. | ||
| public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>( | ||
| of subject: Message.Subject.Type, | ||
| for identifier: Identifier, | ||
| bufferSize limit: Int = 10 | ||
| ) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message { | ||
| return AsyncMessageSequence<Message>(self, nil, limit) | ||
| } | ||
|
|
||
| /// Returns an asynchronous sequence of messages produced by this center for a given subject and message type. | ||
| /// - Parameters: | ||
| /// - subject: The subject to observe. Specify a metatype to observe all values for a given type. | ||
| /// - messageType: The message type to be observed. | ||
| /// - limit: The maximum number of messages allowed to buffer. | ||
| /// - Returns: An asynchronous sequence of messages produced by this center. | ||
| public func messages<Message: AsyncMessage>( | ||
| of subject: Message.Subject? = nil, | ||
| for messageType: Message.Type, | ||
| bufferSize limit: Int = 10 | ||
| ) -> some AsyncSequence<Message, Never> where Message.Subject: AnyObject { | ||
| return AsyncMessageSequence<Message>(self, subject, limit) | ||
| } | ||
| } | ||
|
|
||
| extension NotificationCenter { | ||
| fileprivate struct AsyncMessageSequence<Message: NotificationCenter.AsyncMessage>: AsyncSequence, Sendable { | ||
| let center: NotificationCenter | ||
| nonisolated(unsafe) weak var object: AnyObject? | ||
| let bufferSize: Int | ||
|
|
||
| init(_ center: NotificationCenter, _ object: AnyObject?, _ bufferSize: Int) { | ||
| self.center = center | ||
| self.object = object | ||
| self.bufferSize = bufferSize | ||
| } | ||
|
|
||
| func makeAsyncIterator() -> AsyncMessageSequenceIterator<Message> { | ||
| return AsyncMessageSequenceIterator(center: center, object: object, bufferSize: bufferSize) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension NotificationCenter { | ||
| fileprivate final class AsyncMessageSequenceIterator<Message: NotificationCenter.AsyncMessage>: AsyncIteratorProtocol, Sendable { | ||
| typealias Element = Message | ||
| typealias Failure = Never | ||
|
|
||
| struct State { | ||
| var observer: NotificationCenter.ObservationToken? | ||
| var continuations: [UnsafeContinuation<Message?, Never>] = [] | ||
| var buffer = Deque<Message>(minimumCapacity: 1) | ||
| let bufferSize: Int | ||
| } | ||
|
|
||
| struct Resumption { | ||
| let message: Message? | ||
| let continuations: [UnsafeContinuation<Message?, Never>] | ||
|
|
||
| init(message: Message?, continuation: UnsafeContinuation<Message?, Never>) { | ||
| self.message = message | ||
| self.continuations = [continuation] | ||
| } | ||
|
|
||
| init(cancelling: [UnsafeContinuation<Message?, Never>]) { | ||
| self.message = nil | ||
| self.continuations = cancelling | ||
| } | ||
|
|
||
| func resume() { | ||
| for continuation in continuations { | ||
| continuation.resume(returning: message) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let state: LockedState<State> | ||
|
|
||
| init(center: NotificationCenter, object: AnyObject?, bufferSize: Int) { | ||
| self.state = LockedState(initialState: State(bufferSize: bufferSize)) | ||
|
|
||
| #if FOUNDATION_FRAMEWORK | ||
| let observerBlock: @Sendable (Notification) -> Void = { [weak self] notification in | ||
| guard let message: Message = NotificationCenter._messageFromNotification(notification) else { return } | ||
|
|
||
| self?.observationCallback(message) | ||
| } | ||
| #else | ||
| let observerBlock: @Sendable (Message) -> Void = { [weak self] message in | ||
| self?.observationCallback(message) | ||
| } | ||
| #endif | ||
|
|
||
| let token = center._addObserver(Message.name, object: object, using: observerBlock) | ||
|
|
||
| self.state.withLock { _state in | ||
| _state.observer = ObservationToken(center: center, token: token) | ||
| } | ||
| } | ||
|
|
||
| deinit { | ||
| teardown() | ||
| } | ||
|
|
||
| func teardown() { | ||
| let (observer, resumption) = state.withLock { _state -> (NotificationCenter.ObservationToken?, Resumption) in | ||
| let observer = _state.observer | ||
| _state.observer = nil | ||
| _state.buffer.removeAll(keepingCapacity: false) | ||
| defer { _state.continuations.removeAll(keepingCapacity: false) } | ||
| return (observer, Resumption(cancelling: _state.continuations)) | ||
| } | ||
|
|
||
| resumption.resume() | ||
|
|
||
| if let observer { | ||
| observer.remove() | ||
| } | ||
| } | ||
|
|
||
| func observationCallback(_ message: Message) { | ||
| state.withLock { _state -> Resumption? in | ||
| if _state.buffer.count + 1 > _state.bufferSize { | ||
| _state.buffer.removeFirst() | ||
| #if canImport(os) | ||
| NotificationCenter.logger.fault("Notification center message dropped due to buffer limit. Check sequence iterator frequently or increase buffer size. Message: \(String(describing: Message.self))") | ||
| #endif | ||
| } | ||
| _state.buffer.append(message) | ||
|
|
||
| if _state.continuations.isEmpty { | ||
| return nil | ||
| } else { | ||
| return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst()) | ||
| } | ||
| }?.resume() | ||
| } | ||
|
|
||
| func next() async -> Message? { | ||
| await withTaskCancellationHandler { | ||
| return await withUnsafeContinuation { (continuation: UnsafeContinuation<Message?, Never>) in | ||
| state.withLock { _state -> Resumption? in | ||
| _state.continuations.append(continuation) | ||
| if _state.buffer.isEmpty { | ||
| return nil | ||
| } else { | ||
| return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst()) | ||
| } | ||
| }?.resume() | ||
| } | ||
| } onCancel: { | ||
| teardown() | ||
| } | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.