NIOSingleStepByteToMessageProcessor
NIOSingleStepByteToMessageProcessor
uses a NIOSingleStepByteToMessageDecoder
to produce messages from a stream of incoming bytes. It works like ByteToMessageHandler
but may be used outside of the channel pipeline. This allows processing of wrapped protocols in a general way.
final class NIOSingleStepByteToMessageProcessor<Decoder> where Decoder : NIOSingleStepByteToMessageDecoder
A NIOSingleStepByteToMessageProcessor
is first initialized with a NIOSingleStepByteToMessageDecoder
. Then call process
as each ByteBuffer
is received from the stream. The closure is called repeatedly with each message produced by the decoder.
When your stream ends, call finishProcessing
to ensure all buffered data is passed to your decoder. This will call decodeLast
one or more times with any remaining data.
Example
Below is an example of a protocol decoded by TwoByteStringCodec
that is sent over HTTP. RawBodyMessageHandler
forwards the headers and trailers directly and uses NIOSingleStepByteToMessageProcessor
to send whole decoded messages.
class TwoByteStringCodec: NIOSingleStepByteToMessageDecoder {
typealias InboundOut = String
public func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
return buffer.readString(length: 2)
}
public func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
return try self.decode(buffer: &buffer)
}
}
class RawBodyMessageHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart // alias for HTTPPart<HTTPRequestHead, ByteBuffer>
// This converts the body from ByteBuffer to String, our message type
typealias InboundOut = HTTPPart<HTTPRequestHead, String>
private var messageProcessor: NIOSingleStepByteToMessageProcessor<TwoByteStringCodec>? = nil
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let req = Self.unwrapInboundIn(data)
do {
switch req {
case .head(let head):
// simply forward on the head
context.fireChannelRead(Self.wrapInboundOut(.head(head)))
case .body(let body):
if self.messageProcessor == nil {
self.messageProcessor = NIOSingleStepByteToMessageProcessor(TwoByteStringCodec())
}
try self.messageProcessor!.process(buffer: body) { message in
self.channelReadMessage(context: context, message: message)
}
case .end(let trailers):
// Forward on any remaining messages and the trailers
try self.messageProcessor?.finishProcessing(seenEOF: false) { message in
self.channelReadMessage(context: context, message: message)
}
context.fireChannelRead(Self.wrapInboundOut(.end(trailers)))
}
} catch {
context.fireErrorCaught(error)
}
}
// Forward on the body messages as whole messages
func channelReadMessage(context: ChannelHandlerContext, message: String) {
context.fireChannelRead(Self.wrapInboundOut(.body(message)))
}
}
private class DecodedBodyHTTPHandler: ChannelInboundHandler {
typealias InboundIn = HTTPPart<HTTPRequestHead, String>
typealias OutboundOut = HTTPServerResponsePart
var msgs: [String] = []
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let message = Self.unwrapInboundIn(data)
switch message {
case .head(let head):
print("head: \(head)")
case .body(let msg):
self.msgs.append(msg)
case .end(let trailers):
print("trailers: \(trailers)")
var responseBuffer = context.channel.allocator.buffer(capacity: 32)
for msg in msgs {
responseBuffer.writeString(msg)
responseBuffer.writeStaticString("\n")
}
var headers = HTTPHeaders()
headers.add(name: "content-length", value: String(responseBuffer.readableBytes))
context.write(Self.wrapOutboundOut(HTTPServerResponsePart.head(
HTTPResponseHead(version: .http1_1,
status: .ok, headers: headers))), promise: nil)
context.write(Self.wrapOutboundOut(HTTPServerResponsePart.body(
.byteBuffer(responseBuffer))), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(HTTPServerResponsePart.end(nil)), promise: nil)
}
}
}
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let bootstrap = ServerBootstrap(group: group).childChannelInitializer({channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true).flatMap { _ in
channel.pipeline.addHandlers([RawBodyMessageHandler(), DecodedBodyHTTPHandler()])
}
})
let channelFuture = bootstrap.bind(host: "127.0.0.1", port: 0)