Dart – Streams

Simple stream examples:

import 'dart:async';

void main() {
  var streamController = StreamController();
  
  streamController.stream
    .listen(
        (data) => print('Incoming data: $data'),
        onError: (err) => print('An error: $err'),
        onDone: () => print('Mission complete!'),
        cancelOnError: false,
    );
  
  streamController.sink.add('Some nice data');
  streamController.sink.add('...more data');
  streamController.sink.addError('Houston, we have a problem!');
  streamController.sink.add('...much more data');
  streamController.sink.close();
}
import 'dart:async';
import 'dart:math';

main() {
  StreamController<double> controller = StreamController<double>();
  StreamSubscription<double> streamSubscription;
  
  // The controllers stream can be accessed through the stream property
  Stream stream = controller.stream;
  
  // Add a value to the streams sink
  Random rand = Random();
  Timer.periodic(Duration(seconds: 1), (_) {
    controller.sink.add(rand.nextDouble() * 10);
  });
  
  // Add a subsciption
  streamSubscription = stream
    .listen((value) {
      print('Value from controller: $value');
    });

  
  // Cancel a subscription
  Timer(Duration(seconds: 10), () {
    streamSubscription.cancel();
    print('subscription.cancel()');
  });
}

A stream which continuously sent random values:

//Defining the stream which roll a dice every half second
Stream<int> getRandomValues() async* {
   var random = Random();
   while (true) {
     await Future.delayed(Duration(milliseconds: 500));
     yield random.nextInt(6) + 1;
   }
 }
 
 //Listen to this function
 getRandomValues().listen((data) => print(data));
  • The function return a Stream. That means we can to subscribe to the stream.
  • The async* means run the function asynchronously. Execution will continue even after “returning” a value.
  •  yield is a return function which doesn’t exit the function. Instead it continues executing the rest of the code after yield.

Stream examples with a StreamTransformer:

A Stream Transformer allows us to perform data transformations on a Stream. These transformations are then pushed back into the Stream to be received by all the listeners defined for that particular stream.

Example 1:

import 'dart:async';

void main() async {
  final StreamController<String> controller = StreamController<String>.broadcast();

  final StreamTransformer transformer =
      StreamTransformer<String, String>.fromHandlers(
          handleData: (data, EventSink sink) {
    sink.add(data);
  });

  final Stream<String> stream = controller.stream;

  stream
      .map((value) => value)
      .transform(transformer)
      .listen((data) {
    print('listen: $data');
  }, onError: (err) => print(err));

  controller.add('foo');
  controller.add('baa');

  controller.close();
}

This transformer is made as the type StreamTransformer<String, String> which means it takes a String and convert it into a String. But because map cannot determine the return type of value it most assume that value are dynamic. So map returns a Stream<dynamic> which are not compatible with the transformer.

We can tell map what type it is going to return on runtime by writing .map<String>((value) => value). But in general we should not throw away the generic part of types.

Example 2:

import 'dart:async';

void main() {
  var streamController = StreamController<int>();
  
  final streamTransformer = StreamTransformer<int, int>.fromHandlers(
    handleData: (int data, EventSink sink) {
      sink.add(data * 2);
    },
    handleError: (Object error, StackTrace stacktrace, EventSink sink) {
      sink.addError('Something went wrong: $error');
    }, 
    handleDone: (EventSink sink) {
      print('Controller mission complete!');
      sink.close();
    }
   );
    
  streamController.stream
    .transform(streamTransformer)
    .listen(
      (data) => print('Data $data'),
      onError: (err) => print('Got an error! $err'),
      onDone: () => print('Stream mission complete!'),
      cancelOnError: false,
    );
  
  streamController.sink.add(1);
  streamController.sink.add(2);
  streamController.sink.add(4);
  streamController.sink.addError('Houston, we have a problem!');
  streamController.sink.add(4);
  
  streamController.sink.close();
}
import 'dart:async';
import 'dart:math';

main() {
  final controller = StreamController<int>();
  
  var subscription;
  
  Random rand = Random();
  
  Timer.periodic(Duration(seconds: 1), (_) {
    controller.add(rand.nextInt(100));
  });

  Stream<int> output = controller.stream;
    
  final trans = StreamTransformer<int, dynamic>.fromHandlers(
    handleData: (number, sink) {
      if (number < 75) {
        sink.add(number);
      } else {
        sink.addError('Oh no. The $number is big,');
      }
    }
  );
  
  subscription = output
    .transform(trans)
    .listen(
      (data) => print(data),
      onError: (err) => print(err)
    );

     
  Timer(Duration(seconds: 10), () {
    subscription.cancel();
    print('subscription.cancel()');
  });
}

https://medium.com/flutter-community/flutter-stream-basics-for-beginners-eda23e44e32f