Dart streams. An introduction with examples.

Image for post
Image for post
picture by https://pixabay.com/users/cowins-822708/

What is a Stream?

From Wikipedia:

A stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.

In Dart, a Stream is a series of asynchronous events that other entities can subscribe and “listen” to.

We’re going to be building some small stream examples that will help visualize how they work. Also, I’m going to be using the timer stream example from the Dart stream tutorial just because I’m lazy and I think it’s the fastest way to go over the concepts, so don’t judge me :)

Build a stream (the manual way)

There are many ways to build the Stream. In this article, we’re going to look at the 2 main ones — “manual” creation and using something called a StreamController. For now, let’s look at how we can create Streams “manually.

Let’s build a Stream that emits numbers as events with a 1-second delay:

Stream<int> numberStream(int countTo) async* {
for (int i = 1; i <= countTo; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}

The important things here are the async* and the yield keywords. The async* keyword tells the compiler that we’re building a generator function (a function that emits events). The yield keyword is what produces the event itself — in our case, it’s a number.

Now, let’s process the events that we’re getting.

Listening to your stream’s events

Image for post
Image for post
image by https://pixabay.com/users/couleur-1195798/

You can start “listening” to the events emitted by the stream by calling listen() on the stream. Let’s listen to our numberStream:

numberStream(3).listen((event) => print("received $event"));

and you should get the following output:

received 1
received 2
received 3

An important thing to remember here is that the stream will not start emitting events until someone starts listening to it. Let’s verify this by adding a couple of print statements to our code:

Stream<int> numberStream(int countTo) async* {
for (int i = 1; i <= countTo; i++) {
await Future.delayed(Duration(seconds: 1));
print("yielding $i");
yield i;
}
}
void main() {
var stream = numberStream(3);
print("stream created. Just sitting here doing nothing for a sec");
Future.delayed(Duration(seconds: 1));
stream.listen((event) => print("received $event"));
}

After running it, we see the following:

stream created. Just sitting here doing nothing for a second
yielding 1
received 1
yielding 2
received 2
yielding 3
received 3

As you can see, even though the stream was created — our generator function does not start executing until we call listen() on the stream. The stream will just sit there idling until that time.

You can process the incoming events from your Stream in 2 ways: using the listen() method and await for keyword. You’re familiar with what listen() does, and await for does exactly the same thing — i.e. it starts the execution of the generator function. Go ahead and substitute the following line in the code above:

stream.listen((event) => print("received $event"));

with this:

await for (var event in stream) {
print("received $event");
}

Run the code and you’ll see the same output. However, there’s a difference in terms of timing of code execution between these two approaches.

Invoking listen() on the stream will put the enclosed code to be executed on the Event Loop i.e. and will act as a callback each time the event is received, while the await for will essentially return a Future with await called on it straight away (duhhh..). So, it’s pretty similar in terms of how await and then works in a Future. In simple words — listen() allows for code further down to be executed, while await for doesn’t. Let’s see how this works with some print statements added to our code.

In case we use listen():

print("start");
numberStream(3).listen((event) => print("received $event"));
print("end");

we get the following output:

start
end
received 1
received 2
received 3

and in case we use await for:

print("start");
await for (var event in numberStream(3)) {
print("received $event");
}
print("end");

we get the following output:

start
received 1
received 2
received 3
end

Where should we use listen() and await for? Assuming the events that you’ll be getting from the stream need to be in strict order (I/O operations like downloading a file, for example) — use await for. In case the order in which your events will come is not important (like click events, etc.) — use listen().

StreamController — the preferred way to interact with streams

Image for post
Image for post
Image by https://pixabay.com/users/niekverlaan-80788/

As mentioned before — there’s another way to create streams. Not only create, but also pause, resume, and stop them completely. There’s a class called StreamController.

Before we get into the details of the StreamController we need to cover one more thing — the StreamSubscription. Think of the subscription as a remote control for your Stream. Let’s see it in action — go ahead and run this code:

final subscription = numberStream(8)
.listen((event) => print("received $event"));

await Future.delayed(Duration(seconds: 3));

print("pausing the subscription and idling for 2 seconds");
subscription.pause();

await Future.delayed(Duration(seconds: 2));

print("resuming...");
subscription.resume();
await Future.delayed(Duration(seconds: 2));

print("stopping the stream");
subscription.cancel();

and the output you should get is:

received 1
received 2
pausing the subscription and idling for 2 seconds
resuming...
received 3
received 4
stopping the stream

Now let’s create our usual numberStream() function using the StreamController:

Stream<int> numberStreamWithController(int countTo) {
final controller = StreamController<int>();
var timer = null;
var counter = 0;

void count(_) {
controller.add(counter);
if (counter == countTo) {
timer.cancel();
}
counter++;
}

timer = Timer.periodic(Duration(milliseconds: 1000), count);
timer.tick;
return controller.stream;
}

Now, this might seem like something completely different, but this function is identical to numberStream() in terms of functionality. The only difference is that we delegate stream handling to the StreamController now.

To clarify the code above a bit more:

  1. We emit the events via controller.add()
  2. We keep track of the time and quit the counting process via Timer
  3. We expose the stream from the controller via controller.stream
  4. We can then listen to this stream just like any other stream via .listen().

The StreamController constructor has 4 callback methods that you can implement that give you better control over your stream. These methods can be invoked via the “remote control” of your stream that was mentioned before — the StreamSubscription class.

onListen -> invoked when someone calls .listen() on your stream

onPause -> when subscription.pause() is called

onResume -> when subscription.resume() is called

onCancel -> when subscription.cancel() is called

To better illustrate how those methods work I’ve created a gist since the examples are getting pretty large.

If you’re downloading a file or doing some long-running operation you might want to know when your stream is done. You can do so with the help of onDone callback in your listen() method.

A stream is considered “done” in 3 situations:

  1. It runs out of events to emit like in the case with our manualNumberStream
  2. The subscription is canceled by calling subscription.cancel()
  3. It emits the “done” event when we close the stream by calling controller.close()

Even though it’s possible to start emitting events before any listener has subscribed — don’t do this! This will cause the controller to buffer events and might lead to a memory leak later on. Also, the controller will unload all buffered events when someone starts to listen to it, and in most instances, this is not the behavior you want your stream to perform.

Error handling

Image for post
Image for post
Image by https://pixabay.com/users/aitoff-388338/

Now let’s get to our most “beloved” part as software developers — error handling.

There are 2 ways to handle errors in streams:

  1. The usual try/catch/finally sequence
  2. Using parameters in the .listen() method

The try/catch is pretty straightforward. Go ahead and add this line to the start of our count() method. Here we’re just telling our stream to throw an error when we encounter number 1.

if (counter == 1) {
controller.addError("Boom!");
}

The try/catch can only be used in combination with await/for :

try {
await for (var value in numberStreamWithController(2)) {
print("received = $value");
}
} catch (error) {
print("error = $error");
}

and the output we should get:

starting the Stream
received = 0
stopping the Stream
error = Boom!

Now, let’s look at what we can do with .listen(). There’s much more that we can do with it. Let’s start with a simple example:

numberStreamWithController(2).listen((int value) {
print("received = $value");
}, onError: (error) {
print("error = $error");
});

the output we get:

starting the Stream
received = 0
error = Boom!
received = 1
received = 2

Weird, right? We encountered an error but the stream continued to send events…

A big difference between handling errors via try/catch and .listen() is that you can continue listening for events with the latter even when you received an error event. All you have to do is to set the cancelOnError parameter to false when initializing .listen(). And it is actually set to false by default — the example above demonstrates that.

In case you want to intercept a specific error event you should call the .handleError() method on your stream and specify the test parameter. The test parameter is a function that accepts an error event and returns a boolean that determines whether we intercept that error or not. Let’s look at that:

Add this line below our “Boom!” error:

if (counter == 3) {
controller.addError(42);
}

Now let’s intercept different kinds of errors:

numberStreamWithController(3)
.handleError((e) => print("int error = $e"),
test: (e) => e is int)
.handleError((e) => print("String error = $e"),
test: (e) => e is String)
.listen((int value) {
print("received = $value");
});

The output you should get:

starting the Stream
received = 0
String error = Boom!
received = 1
received = 2
int error = 42
received = 3

As you can see, we can now intercept specific errors by providing a test parameter to the handleError method.

Broadcast streams. Allowing multiple subscribers.

Image for post
Image for post
Image by https://pixabay.com/users/wikiimages-1897/

The final thing that I wanted to explore is the broadcast stream. Such streams allow for multiple listeners. They come in handy whenever you want to notify multiple subscribers of various events, like user interaction with the UI.

Go ahead and run this code:

final stream = manualNumberStream(5).asBroadcastStream();

stream.listen((event) {
print("1st subscriber = $event");
});

await Future.delayed(Duration(seconds: 3));

stream.listen((event) {
print("2nd subscriber = $event");
});

The output you should get:

1st subsriber = 1
1st subsriber = 2
1st subsriber = 3
2nd subsriber = 3
1st subsriber = 4
2nd subsriber = 4
1st subsriber = 5
2nd subsriber = 5

As you can see, even though we have the 1st subscriber listening for updates, the 2nd subscriber can also start listening to the stream that started emitting events.

Conclusion

Dart streams provide a powerful way of asynchronous interaction with chunks of data. Even though this turned out to be a lengthy post, in reality, we have barely scratched the surface of streams in Dart. Dart streams provide a lot of APIs of interaction with data via various methods and RxDart takes this concept even further.

As usual — in case you’re using Quizlet to repeat the stuff you’ve learned — I’ve made some flash cards here, and here.

I hope you enjoyed this article and I would love to hear your comments below :)

Software engineer. Learning everyday and trying to share my knowledge in the process. Into mobile development, computer science and the brain.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store