AsyncThrowingStream

    An asynchronous sequence generated from an error-throwing closure that calls a continuation to produce new elements.

    iOS
    13.0+
    macOS
    10.15+
    tvOS
    13.0+
    watchOS
    6.0+
    struct AsyncThrowingStream<Element, Failure> where Failure : Error

    AsyncThrowingStream conforms to AsyncSequence, providing a convenient way to create an asynchronous sequence without manually implementing an asynchronous iterator. In particular, an asynchronous stream is well-suited to adapt callback- or delegation-based APIs to participate with async-await.

    In contrast to AsyncStream, this type can throw an error from the awaited next(), which terminates the stream with the thrown error.

    You initialize an AsyncThrowingStream with a closure that receives an AsyncThrowingStream.Continuation. Produce elements in this closure, then provide them to the stream by calling the continuation’s yield(_:) method. When there are no further elements to produce, call the continuation’s finish() method. This causes the sequence iterator to produce a nil, which terminates the sequence. If an error occurs, call the continuation’s finish(throwing:) method, which causes the iterator’s next() method to throw the error to the awaiting call point. The continuation is Sendable, which permits calling it from concurrent contexts external to the iteration of the AsyncThrowingStream.

    An arbitrary source of elements can produce elements faster than they are consumed by a caller iterating over them. Because of this, AsyncThrowingStream defines a buffering behavior, allowing the stream to buffer a specific number of oldest or newest elements. By default, the buffer limit is Int.max, which means it’s unbounded.

    Adapting Existing Code to Use Streams

    To adapt existing callback code to use async-await, use the callbacks to provide values to the stream, by using the continuation’s yield(_:) method.

    Consider a hypothetical QuakeMonitor type that provides callers with Quake instances every time it detects an earthquake. To receive callbacks, callers set a custom closure as the value of the monitor’s quakeHandler property, which the monitor calls back as necessary. Callers can also set an errorHandler to receive asynchronous error notifications, such as the monitor service suddenly becoming unavailable.

    class QuakeMonitor {
        var quakeHandler: ((Quake) -> Void)?
        var errorHandler: ((Error) -> Void)?
    
        func startMonitoring() {}
        func stopMonitoring() {}
    }

    To adapt this to use async-await, extend the QuakeMonitor to add a quakes property, of type AsyncThrowingStream<Quake>. In the getter for this property, return an AsyncThrowingStream, whose build closure – called at runtime to create the stream – uses the continuation to perform the following steps:

    1. Creates a QuakeMonitor instance.

    2. Sets the monitor’s quakeHandler property to a closure that receives each Quake instance and forwards it to the stream by calling the continuation’s yield(_:) method.

    3. Sets the monitor’s errorHandler property to a closure that receives any error from the monitor and forwards it to the stream by calling the continuation’s finish(throwing:) method. This causes the stream’s iterator to throw the error and terminate the stream.

    4. Sets the continuation’s onTermination property to a closure that calls stopMonitoring() on the monitor.

    5. Calls startMonitoring on the QuakeMonitor.

    extension QuakeMonitor {
    
        static var throwingQuakes: AsyncThrowingStream<Quake, Error> {
            AsyncThrowingStream { continuation in
                let monitor = QuakeMonitor()
                monitor.quakeHandler = { quake in
                     continuation.yield(quake)
                }
                monitor.errorHandler = { error in
                    continuation.finish(throwing: error)
                }
                continuation.onTermination = { @Sendable _ in
                    monitor.stopMonitoring()
                }
                monitor.startMonitoring()
            }
        }
    }

    Because the stream is an AsyncSequence, the call point uses the for-await-in syntax to process each Quake instance as produced by the stream:

    do {
        for try await quake in quakeStream {
            print("Quake: \(quake.date)")
        }
        print("Stream done.")
    } catch {
        print("Error: \(error)")
    }

    Citizens in _Concurrency

    Members

    Citizens in _Concurrency

    where Failure:Error

    Conformances

    Members

    Features

    Citizens in _Concurrency

    where Element:Sendable, Failure:Error

    Conformances

    • protocol Sendable

      A type whose values can safely be passed across concurrency domains by copying.