Interesting patterns to consume ZIO Streams

Bilal Fazlani
5 min readJun 30, 2023

In this post, we will see some of ZStream consumptions patterns by composing multiple ZSinks together in different ways

run

In order to run a stream, you need a sink. The sink may consume one or more or maybe all the values of the stream.

val stream: ZStream[Any, Nothing, Int] = ZStream(1, 2, 3)

val sink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.foldLeft(0)(_ + _)

val result: ZIO[Any, Nothing, Int] = stream run sink

New to ZIO-Streams? Check out ZIO-Streams 101 and Introduction to ZIO Streams

zipPar: consuming a stream in parallel

When you want to consume one stream in parallel, you can use zipPar. It will consume "ALL" elements from the stream, apply two sinks to it, and then zip the result in a tuple. For example, consuming an http request in a stream format. You can apply two sinks to it, one to process the request and another to log the request.

val requestStream: ZStream[Any, Nothing, Byte] = ???

val requestProcessorSink: ZSink[Any, Exception, Byte, Nothing, Unit] = ???

val requestLoggerSink: ZSink[Any, Exception, Byte, Nothing, Unit] = ???

val result: ZIO[Any, Exception, Unit] =
requestStream.run(requestProcessorSink zipPar requestLoggerSink)

zip: consuming a stream sequentially

Sink.zip is useful when you want one sink to consume "some elements" of a stream by one sink and rest by another sink. The first Sink decides how many elements it wants to consume. The rest of the elements are consumed by the second sink. This effect can be chained with multiple sinks.

For example, you want to consume a stream of bytes and want to parse the first few bytes as a header and the rest as a body. You can use Sink.zip to do that.

val inputStream: ZStream[Any, Nothing, String] = ???

case class Header(str: String)

case class Body(str: String)

case class Request(header: Header, body: Body)

val requestHeaderSink =
ZSink
.head[String]
.map(line => Header(line.get))

val requestBodySink: ZSink[Any, Nothing, String, Nothing, Body] =
ZSink.collectAll[String].map(chunk => Body(chunk.mkString))

val run: ZIO[Any, Nothing, Request] = inputStream
.run(requestHeaderSink zip requestBodySink)
.map(Request.apply)

peel & flatMap: consuming a stream sequentially using previous result

ZStream.peel is a lower level approach to the do the same thing as Sink.zip.

But peel offers more flexibility. With peel, you get access to Result1. It can be useful if Sink2 depends on Result1. The second sink gets access to the result of the first sink and remaining stream. For example, consider you want to parse the header and then based on the header, you want to parse the body in a streaming fashion

val inputStream: ZStream[Any, Nothing, String] = ???

enum MessageType:
case Json, Xml

enum MessageBody:
case JsonBody(str: String)
case XmlBody(str: String)

val headerParserSink =
ZSink
.head[String]
.mapZIO {
case Some("json") => ZIO.succeed(MessageType.Json)
case Some("xml") => ZIO.succeed(MessageType.Xml)
case _ => ZIO.fail(new Exception("Invalid header"))
}

def bodyParserSink(messageType: MessageType) =
ZSink
.collectAll[String]
.map(chunk =>
messageType match
case MessageType.Json => MessageBody.JsonBody(chunk.mkString)
case MessageType.Xml => MessageBody.XmlBody(chunk.mkString)
)

val runPeel: ZIO[Scope, Exception, MessageBody] =
for {
peeled <- inputStream.peel(headerParserSink)
(msgType: MessageType, bodyStream: ZStream[Any, Nothing, String]) = peeled
body: MessageBody <- bodyStream run bodyParserSink(msgType)
} yield body

Similar outcome can be achieved using flatMap as well. flatMap is a higher level combinator. It can be used to chain sinks together while giving input of previous sink to the next sink. This is more convenient than peel as you don't have to manually feed the result of the first sink to the second sink.

val runFlatMap: ZIO[Any, Exception, MessageBody] =
val combinedSink = headerParserSink flatMap bodyParserSink
inputStream run combinedSink

race: racing multiple streams in parallel

race can be used to run two sinks in parallel and return the result of the sink that finishes first

The slower stream is cancelled

val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)

case class Sum(value: Int)
case class Product(value: Int)

val addition =
ZSink.foldLeft[Int, Sum](Sum(0))((acc, value) => Sum(acc.value + value))

val multiplication =
ZSink.foldLeft[Int, Product](Product(1))((acc, value) =>
Product(acc.value * value)
)

val result: UIO[Sum | Product] =
inputStream run (addition race multiplication)

val run = result.flatMap {
case Sum(value) => Console.printLine(s"Sum: $value")
case Product(value) => Console.printLine(s"Product: $value")
}

orElse: chain sinks together as fallback strategy

orElse can be used to chain sinks together as a fallback. If the first sink fails, the second sink is run. If the second sink fails, the third sink is run and so on. The first sink that succeeds is returned. If all sinks fail, the error of the last sink is returned.

case class Error(message: String)

val inputStream: ZStream[Any, Nothing, Byte] = ???

val sink1: ZSink[Any, Error, Byte, Nothing, String] = ???

val sink2: ZSink[Any, Error, Byte, Nothing, Int] = ???

val sink3: ZSink[Any, Error, Byte, Nothing, Boolean] = ???

val run: IO[Error, String | Int | Boolean] =
inputStream run (sink1 orElse sink2 orElse sink3)

splitWhere & foreachWhile: consume partial data

splitWhere can be used to create a new sink from an existing sink such that the new sink consumes only a few initial elements of the stream and the rest of the stream is returned as a leftover stream.

foreachWhile also supports same use case without first creating sink. Lets see examples of both.

Using splitwhere:

val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)

val printSink: ZSink[Any, IOException, Int, Nothing, Unit] =
ZSink.foreach[Any, IOException, Int](Console.printLine(_))

val partialPrintSink: ZSink[Any, IOException, Int, Int, Unit] =
printSink.splitWhere(_ == 5)

val decoratedPrintSink: ZSink[Any, IOException, Int, Nothing, Unit] =
ZSink.foreach[Any, IOException, Int](value => Console.printLine(s"Value: $value")
)

val run = inputStream run (partialPrintSink zip decoratedPrintSink)

using foreachWhile:

val inputStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)

val partialPrintSink = ZSink.foreachWhile((int: Int) =>
if int < 5 then Console.printLine(int) as true
else ZIO.succeed(false)
)

val decoratedPrintSink: ZSink[Any, IOException, Int, Nothing, Unit] =
ZSink.foreach[Any, IOException, Int](value =>
Console.printLine(s"Value: $value")
)

val run = inputStream run (partialPrintSink zip decoratedPrintSink)

Both above programs produce the same output

1
2
3
4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10

--

--

Bilal Fazlani

Scala | ZIO | Akka | Functional Programming | Metaprogramming | Distributed Systems