ZIO-Streams 101

Bilal Fazlani
4 min readJun 30, 2023

ZIO-Streams is a library for purely functional, asynchronous, concurrent stream processing in Scala. It is built on top of ZIO. Additionally, it implements the Reactive Streams specification. This means that ZIO-Streams can be used with other libraries that implement the same specification.

This is an introductory post to ZIO-Streams. I will cover the basics of ZIO-Streams.

Introduction

A stream represents emission of data over time. It can be infinite or finite.

Let’s take a look at a simple example of a stream that emits integers from 1 to 10.

val coldStream: ZStream[Any, Nothing, Int] = ZStream.range(1, 10)

Its similar to ZIO data structure: ZStream[-R, +E, +A]. A here represents the type of elements that the stream emits.

The above stream is called a cold stream. Its because it’s finite and all the data is already available. Unlike a hot stream, where amount of data is unknown and potentially infinite. And hence all the data can never be available at once . Here’s an example:

val hotStream: ZStream[Any, IOException, Byte] = 
ZStream.fromInputStream(java.lang.System.in)

This is a byte stream that reads from System.in. The user can keep on typing and the stream will keep on emitting bytes.

It generally easier to work with cold streams because you can process or consume the data at your speed. Reading a file from a disk is an example of a cold stream. Hot streams are when you are plugged into a live data source like http calls. You can’t control the speed at which data is emitted. You can can either buffer some data or drop some data if consuming speed is slower than the emitting speed.

Components of a zio-stream

There are three fundamental components of a stream — ZStream, ZSink and ZPipeline.

To put it visually, it looks something like this:

ZStraem

Think of ZStream as the source of data. It is where is data is read from and then passed on to next stage in the flow. It can be a file, a database, a http call, etc. It can also be another stream.

ZPipeline

ZPipeline is the middle layer where you process your data. Processing may include filtering, transforming, etc. Processing can be pure or effectful (like http call).

ZSink

ZSink is the final stage where you consume the data. It can be writing to a file, writing to a database, or simply aggregate it in memory.

Examples

Let’s create a stream that reads from a file, filters out all the lines that start with # and then performs a word count of all the remaining lines.

//source: read data
val byteStream: ZStream[Any, Throwable, Byte] =
ZStream.fromFileName("lines.txt")

//transformed stream
val stringStream: ZStream[Any, Throwable, String] =
byteStream >>> ZPipeline.utfDecode >>> ZPipeline.splitLines

Here, we created a byte stream from a file source. And then attached two pipelines to it. Notice that when we attach a pipeline to a stream, the type of the stream changes. It changes from ZStream[Any, Throwable, Byte] to ZStream[Any, Throwable, String]. But the ZStream type is still the same. It's just that the type of elements that it emits has changed.

ZStream + [ZPipelines] = ZStream

Now lets write the remaining pipelines to further filter and transform the data

// pipeline: filter
val filterLines: ZPipeline[Any, Nothing, String, String] =
ZPipeline.filter(!_.startsWith("#"))

// pipeline: transform
val countLineWords: ZPipeline[Any, Nothing, String, Int] =
ZPipeline.map(_.split(" ").length)

filterLines is a pipeline that takes a stream of String and returns a stream of String. With possibly less number of elements. countLineWords is a pipeline that takes a stream of String and returns a stream of Int. Each Int represents the number of words in a line.

We can combine these two pipelines into one pipeline using >>> operator.

val filterAndCount: ZPipeline[Any, Nothing, String, Int] = 
filterLines >>> countLineWords

Let’s create the final sink that will consume the data.

//sink: aggregate
val sink: ZSink[Any, Nothing, Int, Nothing, Int] =
ZSink.foldLeft(0)(_ + _)

ZIO-Streams offers good composition. We can combine filterAndCount pipeline with the sink and get a new combined sink.

val wordCountSink: ZSink[Any, Nothing, String, Nothing, Int] = 
filterAndCount >>> sink

[ZPipelines] + ZSink = ZSink

Notice that unlike previous sink, this combined sink takes a stream of Strings and not Ints.

Now we can run the stream with the sink and get the result. To run a stream, we need two things — a ZStream and a ZSink. ZPipelines are optional.

//run: execute and get aggregated result
val result: ZIO[Any, Throwable, Int] =
stringStream >>> wordCountSink

ZStream + ZSink = ZIO

You can also chose to combine all individual components in the end and run the stream.

val result: ZIO[Any, Throwable, Int] = 
stringStream >>> filterLines >>> countLineWords >>> sink

It all works as long as output of one component matches the input of the next component.

I wrote a very verbose code to demonstrate the individual components. But you can also write it in a more concise way.

val result: ZIO[Any, Throwable, Int] = 
ZStream.fromFileName("lines.txt")
.via(ZPipeline.utfDecode)
.via(ZPipeline.splitLines)
.filter(!_.startsWith("#"))
.map(_.split(" ").length)
.runSum

Concluding thoughts#

ZIO-Streams is a very powerful library and it has much more to offer than what I have covered here. Please refer to the official documentation for more details.

If you have any questions or feedback, please feel free to reach out to me on twitter .

--

--

Bilal Fazlani

Scala | ZIO | Akka | Functional Programming | Metaprogramming | Distributed Systems