Simple stream examples:
import 'dart:async';
void main() {
var streamController = StreamController();
streamController.stream
.listen(
(data) => print('Incoming data: $data'),
onError: (err) => print('An error: $err'),
onDone: () => print('Mission complete!'),
cancelOnError: false,
);
streamController.sink.add('Some nice data');
streamController.sink.add('...more data');
streamController.sink.addError('Houston, we have a problem!');
streamController.sink.add('...much more data');
streamController.sink.close();
}
import 'dart:async';
import 'dart:math';
main() {
StreamController<double> controller = StreamController<double>();
StreamSubscription<double> streamSubscription;
// The controllers stream can be accessed through the stream property
Stream stream = controller.stream;
// Add a value to the streams sink
Random rand = Random();
Timer.periodic(Duration(seconds: 1), (_) {
controller.sink.add(rand.nextDouble() * 10);
});
// Add a subsciption
streamSubscription = stream
.listen((value) {
print('Value from controller: $value');
});
// Cancel a subscription
Timer(Duration(seconds: 10), () {
streamSubscription.cancel();
print('subscription.cancel()');
});
}
A stream which continuously sent random values:
//Defining the stream which roll a dice every half second
Stream<int> getRandomValues() async* {
var random = Random();
while (true) {
await Future.delayed(Duration(milliseconds: 500));
yield random.nextInt(6) + 1;
}
}
//Listen to this function
getRandomValues().listen((data) => print(data));
- The function return a Stream. That means we can to subscribe to the stream.
- The
async*
means run the function asynchronously. Execution will continue even after “returning” a value. -
yield
is a return function which doesn’t exit the function. Instead it continues executing the rest of the code after yield.
Stream examples with a StreamTransformer:
A Stream Transformer allows us to perform data transformations on a Stream. These transformations are then pushed back into the Stream to be received by all the listeners defined for that particular stream.
Example 1:
import 'dart:async';
void main() async {
final StreamController<String> controller = StreamController<String>.broadcast();
final StreamTransformer transformer =
StreamTransformer<String, String>.fromHandlers(
handleData: (data, EventSink sink) {
sink.add(data);
});
final Stream<String> stream = controller.stream;
stream
.map((value) => value)
.transform(transformer)
.listen((data) {
print('listen: $data');
}, onError: (err) => print(err));
controller.add('foo');
controller.add('baa');
controller.close();
}
This transformer
is made as the type StreamTransformer<String, String>
which means it takes a String
and convert it into a String
. But because map
cannot determine the return type of value
it most assume that value
are dynamic
. So map
returns a Stream<dynamic>
which are not compatible with the transformer.
We can tell map
what type it is going to return on runtime by writing .map<String>((value) => value)
. But in general we should not throw away the generic part of types.
Example 2:
import 'dart:async';
void main() {
var streamController = StreamController<int>();
final streamTransformer = StreamTransformer<int, int>.fromHandlers(
handleData: (int data, EventSink sink) {
sink.add(data * 2);
},
handleError: (Object error, StackTrace stacktrace, EventSink sink) {
sink.addError('Something went wrong: $error');
},
handleDone: (EventSink sink) {
print('Controller mission complete!');
sink.close();
}
);
streamController.stream
.transform(streamTransformer)
.listen(
(data) => print('Data $data'),
onError: (err) => print('Got an error! $err'),
onDone: () => print('Stream mission complete!'),
cancelOnError: false,
);
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(4);
streamController.sink.addError('Houston, we have a problem!');
streamController.sink.add(4);
streamController.sink.close();
}
import 'dart:async';
import 'dart:math';
main() {
final controller = StreamController<int>();
var subscription;
Random rand = Random();
Timer.periodic(Duration(seconds: 1), (_) {
controller.add(rand.nextInt(100));
});
Stream<int> output = controller.stream;
final trans = StreamTransformer<int, dynamic>.fromHandlers(
handleData: (number, sink) {
if (number < 75) {
sink.add(number);
} else {
sink.addError('Oh no. The $number is big,');
}
}
);
subscription = output
.transform(trans)
.listen(
(data) => print(data),
onError: (err) => print(err)
);
Timer(Duration(seconds: 10), () {
subscription.cancel();
print('subscription.cancel()');
});
}
https://medium.com/flutter-community/flutter-stream-basics-for-beginners-eda23e44e32f