SOAR-0010: Support for JSON Lines, JSON Sequence, and Server-sent Events

Introduce streaming encoders and decoders for JSON Lines, JSON Sequence, and Server-sent Events for as a convenience API.

SOAR-0010.md

Overview

Introduction

Add streaming encoders and decoders for these three event stream formats to the runtime library, allowing adopters to easily produce and consume event streams, both on the client and server.

Motivation

While the OpenAPI specification is optimized for HTTP APIs that send a single request value, and receive a single response value, there are many use cases in which developers want to stream values over time.

A simple example of streaming “values” is a file transfer, which can be thought of as a stream of byte chunks that represent the contents of the file. Another is multipart content, streaming individual parts over time. Both of these are already supported by Swift OpenAPI Generator, as of version 0.3.0 and 1.0.0-alpha.1, respectively.

Another popular use case for streaming is to send JSON-encoded events over time, usually (but not exclusively), from the server to the client.

  • The Kubernetes API uses JSON Lines to stream updates to resources from its control plane.

  • The OpenAI API uses Server-sent Events to stream text snippets from ChatGPT.

  • I couldn’t find a popular service using JSON Sequence, but unlike JSON Lines, it’s well-defined in RFC7464, and also used around the industry.

The flow starts with the client initiating an HTTP request to the server, and the server responding with an HTTP response head, and then the server starting to stream the response body, which contains the delimited events, processed over time by the client.

This lightweight solution has the advantage of being a plain HTTP request/response pair, without requiring a custom protocol to either replace HTTP, or sit on top of it. This makes intermediaries, such as proxies, still able to pass data through without being aware of the streaming nature of the HTTP body.

Proposed solution

Since the OpenAPI specification does not explicitly mention event streaming, it’s up to tools, such as Swift OpenAPI Generator, to provide additional conveniences.

This proposed solution consists of two parts:

  1. Add streaming encoders and decoders for the three event stream formats to the runtime library, represented as an AsyncSequence that converts elements between byte chunks and parsed events.

  2. Provide examples for how adopters can then chain those sequences on the HTTPBody values they either produce or consume, in their code. No extra code would be generated.

Generally, the three event stream formats are associated with the following content types:

  • JSON Lines: application/jsonl, application/x-ndjson

  • JSON Sequence: application/json-seq

  • Server-sent Events: text/event-stream

The generated code would continue to only vend the raw sequence of byte chunks (HTTPBody), and adopters could optionally chain the encoding/decoding sequence on it. For example, an OpenAPI document with a JSON Lines stream of Greeting values could contain the following:

paths:
  /greetings:
    get:
      operationId: getGreetingsStream
      responses:
        '200':
          content:
            application/jsonl:
              schema:
                $ref: '#/components/schemas/Greeting'
components:
  schemas:
    Greeting:
      type: object
      properties:
        ...

The important part is the application/jsonl (JSON Lines) content type (not to be confused with a plain application/json content type), and the event schema in #/components/schemas.

Consuming event streams

As a consumer of such a body in Swift (usually on the client), you’d use one of the proposed methods, here asDecodedJSONLines(of:decoder:) to get a stream that parses the individual JSON lines and decodes each JSON object as a value of Components.Schemas.Greeting.

Then, you can read the stream, for example in a for try await loop.

let response = try await client.getGreetingsStream()
let httpBody = try response.ok.body.application_jsonl
let greetingStream = httpBody.asDecodedJSONLines(of: Components.Schemas.Greeting.self)
for try await greeting in greetingStream {
    print("Got greeting: \(greeting.message)")
}

Producing event streams

As a producer of such a body, start with a root async sequence, for example an AsyncStream, and submit events to it.

let (stream, continuation) = AsyncStream<Components.Schemas.Greeting>.makeStream()
// Pass the continuation to another task that calls 
// `continuation.yield(...)` with events, and `continuation.finish()` 
// at the end.

let httpBody = HTTPBody(
    stream.asEncodedJSONLines(),
    length: .unknown,
    iterationBehavior: .single
)
// Provide `httpBody` to the response, for example.
return .ok(.init(body: .application_jsonl(httpBody)))

Detailed design

The rest of this section contains the Swift interface of the new API for the runtime library.

/// A sequence that parses arbitrary byte chunks into lines using the JSON Lines format.
public struct JSONLinesDeserializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

    /// Creates a new sequence.
    /// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
    public init(upstream: Upstream)
}

extension JSONLinesDeserializationSequence : AsyncSequence {
    public typealias Element = ArraySlice<UInt8>
    public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
        public mutating func next() async throws -> ArraySlice<UInt8>?
    }
    public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element == ArraySlice<UInt8> {

    /// Returns another sequence that decodes each JSON Lines event as the provided type using the provided decoder.
    /// - Parameters:
    ///   - eventType: The type to decode the JSON event into.
    ///   - decoder: The JSON decoder to use.
    /// - Returns: A sequence that provides the decoded JSON events.
    public func asDecodedJSONLines<Event>(of eventType: Event.Type = Event.self, decoder: JSONDecoder = .init()) -> AsyncThrowingMapSequence<JSONLinesDeserializationSequence<Self>, Event> where Self : Sendable, Event : Decodable
}

/// A sequence that serializes lines by concatenating them using the JSON Lines format.
public struct JSONLinesSerializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

    /// Creates a new sequence.
    /// - Parameter upstream: The upstream sequence of lines.
    public init(upstream: Upstream)
}

extension JSONLinesSerializationSequence : AsyncSequence {
    public typealias Element = ArraySlice<UInt8>
    public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
        public mutating func next() async throws -> ArraySlice<UInt8>?
    }
    public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element : Encodable {

    /// Returns another sequence that encodes the events using the provided encoder into JSON Lines.
    /// - Parameter encoder: The JSON encoder to use.
    /// - Returns: A sequence that provides the serialized JSON Lines.
    public func asEncodedJSONLines(encoder: JSONEncoder = {
            let encoder = JSONEncoder()
            encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
            return encoder
        }()) -> JSONLinesSerializationSequence<AsyncThrowingMapSequence<Self, ArraySlice<UInt8>>>
}

/// A sequence that parses arbitrary byte chunks into lines using the JSON Sequence format.
public struct JSONSequenceDeserializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

    /// Creates a new sequence.
    /// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
    public init(upstream: Upstream)
}

extension JSONSequenceDeserializationSequence : AsyncSequence {
    public typealias Element = ArraySlice<UInt8>
    public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
        public mutating func next() async throws -> ArraySlice<UInt8>?
    }
    public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element == ArraySlice<UInt8> {

    /// Returns another sequence that decodes each JSON Sequence event as the provided type using the provided decoder.
    /// - Parameters:
    ///   - eventType: The type to decode the JSON event into.
    ///   - decoder: The JSON decoder to use.
    /// - Returns: A sequence that provides the decoded JSON events.
    public func asDecodedJSONSequence<Event>(of eventType: Event.Type = Event.self, decoder: JSONDecoder = .init()) -> AsyncThrowingMapSequence<JSONSequenceDeserializationSequence<Self>, Event> where Self : Sendable, Event : Decodable
}

/// A sequence that serializes lines by concatenating them using the JSON Sequence format.
public struct JSONSequenceSerializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

    /// Creates a new sequence.
    /// - Parameter upstream: The upstream sequence of lines.
    public init(upstream: Upstream)
}

extension JSONSequenceSerializationSequence : AsyncSequence {
    public typealias Element = ArraySlice<UInt8>
    public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
        public mutating func next() async throws -> ArraySlice<UInt8>?
    }
    public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element : Encodable {

    /// Returns another sequence that encodes the events using the provided encoder into a JSON Sequence.
    /// - Parameter encoder: The JSON encoder to use.
    /// - Returns: A sequence that provides the serialized JSON Sequence.
    public func asEncodedJSONSequence(encoder: JSONEncoder = {
            let encoder = JSONEncoder()
            encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
            return encoder
        }()) -> JSONSequenceSerializationSequence<AsyncThrowingMapSequence<Self, ArraySlice<UInt8>>>
}

/// An event sent by the server that has a JSON payload in the data field.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
public struct ServerSentEventWithJSONData<JSONDataType> : Sendable, Hashable where JSONDataType : Hashable, JSONDataType : Sendable {

    /// A type of the event, helps inform how to interpret the data.
    public var event: String?

    /// The payload of the event.
    public var data: JSONDataType?

    /// A unique identifier of the event, can be used to resume an interrupted stream by
    /// making a new request with the `Last-Event-ID` header field set to this value.
    ///
    /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header
    public var id: String?

    /// The amount of time, in milliseconds, the client should wait before reconnecting in case
    /// of an interruption.
    ///
    /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
    public var retry: Int64?

    /// Creates a new event.
    /// - Parameters:
    ///   - event: A type of the event, helps inform how to interpret the data.
    ///   - data: The payload of the event.
    ///   - id: A unique identifier of the event.
    ///   - retry: The amount of time, in milliseconds, to wait before retrying.
    public init(event: String? = nil, data: JSONDataType? = nil, id: String? = nil, retry: Int64? = nil)
}

/// An event sent by the server.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
public struct ServerSentEvent : Sendable, Hashable {

    /// A unique identifier of the event, can be used to resume an interrupted stream by
    /// making a new request with the `Last-Event-ID` header field set to this value.
    ///
    /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header
    public var id: String?

    /// A type of the event, helps inform how to interpret the data.
    public var event: String?

    /// The payload of the event.
    public var data: String?

    /// The amount of time, in milliseconds, the client should wait before reconnecting in case
    /// of an interruption.
    ///
    /// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
    public var retry: Int64?

    /// Creates a new event.
    /// - Parameters:
    ///   - id: A unique identifier of the event.
    ///   - event: A type of the event, helps inform how to interpret the data.
    ///   - data: The payload of the event.
    ///   - retry: The amount of time, in milliseconds, to wait before retrying.
    public init(id: String? = nil, event: String? = nil, data: String? = nil, retry: Int64? = nil)
}

/// A sequence that parses arbitrary byte chunks into events using the Server-sent Events format.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
public struct ServerSentEventsDeserializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

    /// Creates a new sequence.
    /// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
    public init(upstream: Upstream)
}

extension ServerSentEventsDeserializationSequence : AsyncSequence {
    public typealias Element = ServerSentEvent
    public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
        public mutating func next() async throws -> ServerSentEvent?
    }
    public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element == ArraySlice<UInt8> {

    /// Returns another sequence that decodes each event's data as the provided type using the provided decoder.
    ///
    /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`.
    /// - Returns: A sequence that provides the events.
    public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence<ServerSentEventsLineDeserializationSequence<Self>>

    /// Returns another sequence that decodes each event's data as the provided type using the provided decoder.
    ///
    /// Use this method if the event's `data` field is JSON.
    /// - Parameters:
    ///   - dataType: The type to decode the JSON data into.
    ///   - decoder: The JSON decoder to use.
    /// - Returns: A sequence that provides the events with the decoded JSON data.
    public func asDecodedServerSentEventsWithJSONData<JSONDataType>(of dataType: JSONDataType.Type = JSONDataType.self, decoder: JSONDecoder = .init()) -> AsyncThrowingMapSequence<ServerSentEventsDeserializationSequence<ServerSentEventsLineDeserializationSequence<Self>>, ServerSentEventWithJSONData<JSONDataType>> where JSONDataType : Decodable
}

/// A sequence that serializes Server-sent Events.
public struct ServerSentEventsSerializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ServerSentEvent {

    /// Creates a new sequence.
    /// - Parameter upstream: The upstream sequence of events.
    public init(upstream: Upstream)
}

extension ServerSentEventsSerializationSequence : AsyncSequence where Upstream.Element == ServerSentEvent {
    public typealias Element = ArraySlice<UInt8>
    public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, Upstream.Element == ServerSentEvent, UpstreamIterator.Element == ServerSentEvent {
        public mutating func next() async throws -> ArraySlice<UInt8>?
    }
    public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence {

    /// Returns another sequence that encodes Server-sent Events with generic data in the data field.
    /// - Returns: A sequence that provides the serialized Server-sent Events.
    public func asEncodedServerSentEvents() -> ServerSentEventsSerializationSequence<Self> where Self : Sendable, Self.Element == ServerSentEvent

    /// Returns another sequence that encodes Server-sent Events that have a JSON value in the data field.
    /// - Parameter encoder: The JSON encoder to use.
    /// - Returns: A sequence that provides the serialized Server-sent Events.
    public func asEncodedServerSentEventsWithJSONData<JSONDataType>(encoder: JSONEncoder = {
            let encoder = JSONEncoder()
            encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
            return encoder
        }()) -> ServerSentEventsSerializationSequence<AsyncThrowingMapSequence<Self, ServerSentEvent>> where JSONDataType : Encodable, Self.Element == ServerSentEventWithJSONData<JSONDataType>
}

API stability

Additive changes to the runtime library, no API changes to the generator or other components.

Future directions

We could add additional event stream formats, if they become popular and well-defined in the industry.

Alternatives considered

  • Not doing anything - this would require adopters to write these encoders and decoders by hand, which is time-consuming, error prone, and duplicates effort across the ecosystem.

  • Generating special types for these streams - this was rejected because it would force the adopter to parse the event stream, even if they instead wanted to forward it as raw data elsewhere. Since these event streams formats are not part of OpenAPI, it felt like a too strong of a limitation, which is why these conveniences are opt-in.