Skip to content

Instantly share code, notes, and snippets.

@josephlord
Last active July 17, 2021 19:30
Show Gist options
  • Save josephlord/1c2b9d19daed3b61daf2545d6523ee6e to your computer and use it in GitHub Desktop.
Save josephlord/1c2b9d19daed3b61daf2545d6523ee6e to your computer and use it in GitHub Desktop.
The inner actor for AsyncSequencePublisher subscription
extension PublisherAsyncSequence {
public class Iterator {
private let iActor = InnerActor()
/// Due to bug [SR-14875](https://bugs.swift.org/browse/SR-14875) we need to avoid calling continuations
/// from the actor execution context currently which is why we wrap the state in this InnerActor instead of just making the Interator
/// and actor
private actor InnerActor {
/// These typealiases are just for cleaner call sites
typealias ElementContinuation = CheckedContinuation<Element?, P.Failure>
typealias SubsciptionContinuation = CheckedContinuation<Void, Never>
private var subscription: Subscription?
private var subscriptionContinuation: SubsciptionContinuation?
private var continuation: ElementContinuation?
func next() async throws -> Element? {
if subscription == nil {
await withCheckedContinuation { continuation in
subscriptionContinuation = continuation
}
}
return try await withCheckedThrowingContinuation({ continuation in
self.continuation = continuation
subscription?.request(.max(1))
})
}
func setSubscription(subscription: Subscription) -> SubsciptionContinuation? {
defer { subscriptionContinuation = nil }
assert(self.subscription == nil)
self.subscription = subscription
return subscriptionContinuation
}
/// You should resume the completion immediately after calling this
func getAndClearMainCompletion() -> ElementContinuation? {
defer { continuation = nil }
return continuation
}
/// You should resume the completion immediately after calling this
func getAndClearSubscriptionCompletion() -> SubsciptionContinuation? {
defer { subscriptionContinuation = nil }
return subscriptionContinuation
}
}
private func receive(compl: Subscribers.Completion<Error>) async {
let continuation = await iActor.getAndClearMainCompletion()
assert(continuation != nil)
switch compl {
case .finished:
continuation?.resume(returning: nil)
case .failure(let err):
continuation?.resume(throwing: err)
}
}
private func receive(input: Element) async {
let continuation = await iActor.getAndClearMainCompletion()
assert(continuation != nil)
continuation?.resume(returning: input)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment