Simple stream examples:
import 'dart:async'; void main() { var streamController = StreamController(); streamController.stream .listen( (data) => print('Incoming data: $data'), onError: (err) => print('An error: $err'), onDone: () => print('Mission complete!'), cancelOnError: false, ); streamController.sink.add('Some nice data'); streamController.sink.add('...more data'); streamController.sink.addError('Houston, we have a problem!'); streamController.sink.add('...much more data'); streamController.sink.close(); }
import 'dart:async'; import 'dart:math'; main() { StreamController<double> controller = StreamController<double>(); StreamSubscription<double> streamSubscription; // The controllers stream can be accessed through the stream property Stream stream = controller.stream; // Add a value to the streams sink Random rand = Random(); Timer.periodic(Duration(seconds: 1), (_) { controller.sink.add(rand.nextDouble() * 10); }); // Add a subsciption streamSubscription = stream .listen((value) { print('Value from controller: $value'); }); // Cancel a subscription Timer(Duration(seconds: 10), () { streamSubscription.cancel(); print('subscription.cancel()'); }); }
A stream which continuously sent random values:
//Defining the stream which roll a dice every half second Stream<int> getRandomValues() async* { var random = Random(); while (true) { await Future.delayed(Duration(milliseconds: 500)); yield random.nextInt(6) + 1; } } //Listen to this function getRandomValues().listen((data) => print(data));
- The function return a Stream. That means we can to subscribe to the stream.
- The
async*
means run the function asynchronously. Execution will continue even after “returning” a value. -
yield
is a return function which doesn’t exit the function. Instead it continues executing the rest of the code after yield.
Stream examples with a StreamTransformer:
A Stream Transformer allows us to perform data transformations on a Stream. These transformations are then pushed back into the Stream to be received by all the listeners defined for that particular stream.
Example 1:
import 'dart:async'; void main() async { final StreamController<String> controller = StreamController<String>.broadcast(); final StreamTransformer transformer = StreamTransformer<String, String>.fromHandlers( handleData: (data, EventSink sink) { sink.add(data); }); final Stream<String> stream = controller.stream; stream .map((value) => value) .transform(transformer) .listen((data) { print('listen: $data'); }, onError: (err) => print(err)); controller.add('foo'); controller.add('baa'); controller.close(); }
This transformer
is made as the type StreamTransformer<String, String>
which means it takes a String
and convert it into a String
. But because map
cannot determine the return type of value
it most assume that value
are dynamic
. So map
returns a Stream<dynamic>
which are not compatible with the transformer.
We can tell map
what type it is going to return on runtime by writing .map<String>((value) => value)
. But in general we should not throw away the generic part of types.
Example 2:
import 'dart:async'; void main() { var streamController = StreamController<int>(); final streamTransformer = StreamTransformer<int, int>.fromHandlers( handleData: (int data, EventSink sink) { sink.add(data * 2); }, handleError: (Object error, StackTrace stacktrace, EventSink sink) { sink.addError('Something went wrong: $error'); }, handleDone: (EventSink sink) { print('Controller mission complete!'); sink.close(); } ); streamController.stream .transform(streamTransformer) .listen( (data) => print('Data $data'), onError: (err) => print('Got an error! $err'), onDone: () => print('Stream mission complete!'), cancelOnError: false, ); streamController.sink.add(1); streamController.sink.add(2); streamController.sink.add(4); streamController.sink.addError('Houston, we have a problem!'); streamController.sink.add(4); streamController.sink.close(); }
import 'dart:async'; import 'dart:math'; main() { final controller = StreamController<int>(); var subscription; Random rand = Random(); Timer.periodic(Duration(seconds: 1), (_) { controller.add(rand.nextInt(100)); }); Stream<int> output = controller.stream; final trans = StreamTransformer<int, dynamic>.fromHandlers( handleData: (number, sink) { if (number < 75) { sink.add(number); } else { sink.addError('Oh no. The $number is big,'); } } ); subscription = output .transform(trans) .listen( (data) => print(data), onError: (err) => print(err) ); Timer(Duration(seconds: 10), () { subscription.cancel(); print('subscription.cancel()'); }); }
https://medium.com/flutter-community/flutter-stream-basics-for-beginners-eda23e44e32f