blob: ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart.async;
// -------------------------------------------------------------------
// Core Stream types
// -------------------------------------------------------------------
* A source of asynchronous data events.
* A Stream provides a sequence of events. Each event is either a data event or
* an error event, representing the result of a single computation. When the
* Stream is exhausted, it may send a single "done" event.
* You can [listen] on a stream to receive the events it sends. When you listen,
* you receive a [StreamSubscription] object that can be used to stop listening,
* or to temporarily pause events from the stream.
* When an event is fired, the listeners at that time are informed.
* If a listener is added while an event is being fired, the change
* will only take effect after the event is completely fired. If a listener
* is canceled, it immediately stops receiving events.
* When the "done" event is fired, subscribers are unsubscribed before
* receiving the event. After the event has been sent, the stream has no
* subscribers. Adding new subscribers after this point is allowed, but
* they will just receive a new "done" event as soon as possible.
* Streams always respect "pause" requests. If necessary they need to buffer
* their input, but often, and preferably, they can simply request their input
* to pause too.
* There are two kinds of streams: The normal "single-subscription" streams and
* "broadcast" streams.
* A single-subscription stream allows only a single listener at a time.
* It holds back events until it gets a listener, and it may exhaust
* itself when the listener is unsubscribed, even if the stream wasn't done.
* Single-subscription streams are generally used for streaming parts of
* contiguous data like file I/O.
* A broadcast stream allows any number of listeners, and it fires
* its events when they are ready, whether there are listeners or not.
* Broadcast streams are used for independent events/observers.
* The default implementation of [isBroadcast] returns false.
* A broadcast stream inheriting from [Stream] must override [isBroadcast]
* to return [:true:].
abstract class Stream<T> {
* Creates a new single-subscription stream from the future.
* When the future completes, the stream will fire one event, either
* data or error, and then close with a done-event.
factory Stream.fromFuture(Future<T> future) {
StreamController<T> controller = new StreamController<T>(sync: true);
future.then((value) {
onError: (error) {
* Creates a single-subscription stream that gets its data from [data].
* If iterating [data] throws an error, the stream ends immediately with
* that error. No done event will be sent (iteration is not complete), but no
* further data events will be generated either, since iteration cannot
* continue.
factory Stream.fromIterable(Iterable<T> data) {
return new _GeneratedStreamImpl<T>(
() => new _IterablePendingEvents<T>(data));
* Creates a stream that repeatedly emits events at [period] intervals.
* The event values are computed by invoking [computation]. The argument to
* this callback is an integer that starts with 0 and is incremented for
* every event.
* If [computation] is omitted the event values will all be `null`.
factory Stream.periodic(Duration period,
[T computation(int computationCount)]) {
if (computation == null) computation = ((i) => null);
Timer timer;
int computationCount = 0;
StreamController<T> controller;
// Counts the time that the Stream was running (and not paused).
Stopwatch watch = new Stopwatch();
void sendEvent() {
T data = computation(computationCount++);
void startPeriodicTimer() {
assert(timer == null);
timer = new Timer.periodic(period, (Timer timer) {
controller = new StreamController<T>(sync: true,
onListen: () {
onPause: () {
timer = null;
onResume: () {
assert(timer == null);
Duration elapsed = watch.elapsed;
timer = new Timer(period - elapsed, () {
timer = null;
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
* Creates a stream where all events of an existing stream are piped through
* a sink-transformation.
* The given [mapSink] closure is invoked when the returned stream is
* listened to. All events from the [source] are added into the event sink
* that is returned from the invocation. The transformation puts all
* transformed events into the sink the [mapSink] closure received during
* its invocation. Conceptually the [mapSink] creates a transformation pipe
* with the input sink being the returned [EventSink] and the output sink
* being the sink it received.
* This constructor is frequently used to build transformers.
* Example use for a duplicating transformer:
* class DuplicationSink implements EventSink<String> {
* final EventSink<String> _outputSink;
* DuplicationSink(this._outputSink);
* void add(String data) {
* _outputSink.add(data);
* _outputSink.add(data);
* }
* void addError(e, [st]) => _outputSink(e, st);
* void close() => _outputSink.close();
* }
* class DuplicationTransformer implements StreamTransformer<String, String> {
* // Some generic types ommitted for brevety.
* Stream bind(Stream stream) => new Stream<String>.eventTransform(
* stream,
* (EventSink sink) => new DuplicationSink(sink));
* }
* stringStream.transform(new DuplicationTransformer());
factory Stream.eventTransformed(Stream source,
EventSink mapSink(EventSink<T> sink)) {
return new _BoundSinkStream(source, mapSink);
* Reports whether this stream is a broadcast stream.
bool get isBroadcast => false;
* Returns a multi-subscription stream that produces the same events as this.
* If this stream is already a broadcast stream, it is returned unmodified.
* If this stream is single-subscription, return a new stream that allows
* multiple subscribers. It will subscribe to this stream when its first
* subscriber is added, and will stay subscribed until this stream ends,
* or a callback cancels the subscription.
* If [onListen] is provided, it is called with a subscription-like object
* that represents the underlying subscription to this stream. It is
* possible to pause, resume or cancel the subscription during the call
* to [onListen]. It is not possible to change the event handlers, including
* using [StreamSubscription.asFuture].
* If [onCancel] is provided, it is called in a similar way to [onListen]
* when the returned stream stops having listener. If it later gets
* a new listener, the [onListen] function is called again.
* Use the callbacks, for example, for pausing the underlying subscription
* while having no subscribers to prevent losing events, or canceling the
* subscription when there are no listeners.
Stream<T> asBroadcastStream({
void onListen(StreamSubscription<T> subscription),
void onCancel(StreamSubscription<T> subscription) }) {
if (isBroadcast) return this;
return new _AsBroadcastStream<T>(this, onListen, onCancel);
* Adds a subscription to this stream.
* On each data event from this stream, the subscriber's [onData] handler
* is called. If [onData] is null, nothing happens.
* On errors from this stream, the [onError] handler is given a
* object describing the error.
* The [onError] callback must be of type `void onError(error)` or
* `void onError(error, StackTrace stackTrace)`. If [onError] accepts
* two arguments it is called with the stack trace (which could be `null` if
* the stream itself received an error without stack trace).
* Otherwise it is called with just the error object.
* If this stream closes, the [onDone] handler is called.
* If [cancelOnError] is true, the subscription is ended when
* the first error is reported. The default is false.
StreamSubscription<T> listen(void onData(T event),
{ Function onError,
void onDone(),
bool cancelOnError});
* Creates a new stream from this stream that discards some data events.
* The new stream sends the same error and done events as this stream,
* but it only sends the data events that satisfy the [test].
Stream<T> where(bool test(T event)) {
return new _WhereStream<T>(this, test);
* Creates a new stream that converts each element of this stream
* to a new value using the [convert] function.
Stream map(convert(T event)) {
return new _MapStream<T, dynamic>(this, convert);
* Creates a wrapper Stream that intercepts some errors from this stream.
* If this stream sends an error that matches [test], then it is intercepted
* by the [handle] function.
* The [onError] callback must be of type `void onError(error)` or
* `void onError(error, StackTrace stackTrace)`. Depending on the function
* type the the stream either invokes [onError] with or without a stack
* trace. The stack trace argument might be `null` if the stream itself
* received an error without stack trace.
* An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
* true. If [test] is omitted, every error is considered matching.
* If the error is intercepted, the [handle] function can decide what to do
* with it. It can throw if it wants to raise a new (or the same) error,
* or simply return to make the stream forget the error.
* If you need to transform an error into a data event, use the more generic
* [Stream.transform] to handle the event by writing a data event to
* the output sink
Stream<T> handleError(Function onError, { bool test(error) }) {
return new _HandleErrorStream<T>(this, onError, test);
* Creates a new stream from this stream that converts each element
* into zero or more events.
* Each incoming event is converted to an [Iterable] of new events,
* and each of these new events are then sent by the returned stream
* in order.
Stream expand(Iterable convert(T value)) {
return new _ExpandStream<T, dynamic>(this, convert);
* Binds this stream as the input of the provided [StreamConsumer].
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_) => streamConsumer.close());
* Chains this stream as the input of the provided [StreamTransformer].
* Returns the result of [:streamTransformer.bind:] itself.
Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
return streamTransformer.bind(this);
* Reduces a sequence of values by repeatedly applying [combine].
Future<T> reduce(T combine(T previous, T element)) {
_Future<T> result = new _Future<T>();
bool seenFirst = false;
T value;
StreamSubscription subscription;
subscription = this.listen(
(T element) {
if (seenFirst) {
_runUserCode(() => combine(value, element),
(T newValue) { value = newValue; },
_cancelAndError(subscription, result));
} else {
value = element;
seenFirst = true;
onError: result._completeError,
onDone: () {
if (!seenFirst) {
result._completeError(new StateError("No elements"));
} else {
cancelOnError: true
return result;
/** Reduces a sequence of values by repeatedly applying [combine]. */
Future fold(var initialValue, combine(var previous, T element)) {
_Future result = new _Future();
var value = initialValue;
StreamSubscription subscription;
subscription = this.listen(
(T element) {
() => combine(value, element),
(newValue) { value = newValue; },
_cancelAndError(subscription, result)
onError: (e, st) {
result._completeError(e, st);
onDone: () {
cancelOnError: true);
return result;
* Collects string of data events' string representations.
* If [separator] is provided, it is inserted between any two
* elements.
* Any error in the stream causes the future to complete with that
* error. Otherwise it completes with the collected string when
* the "done" event arrives.
Future<String> join([String separator = ""]) {
_Future<String> result = new _Future<String>();
StringBuffer buffer = new StringBuffer();
StreamSubscription subscription;
bool first = true;
subscription = this.listen(
(T element) {
if (!first) {
first = false;
try {
} catch (e, s) {
result._completeError(_asyncError(e, s));
onError: (e) {
onDone: () {
cancelOnError: true);
return result;
* Checks whether [needle] occurs in the elements provided by this stream.
* Completes the [Future] when the answer is known.
* If this stream reports an error, the [Future] will report that error.
Future<bool> contains(Object needle) {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
() => (element == needle),
(bool isMatch) {
if (isMatch) {
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
* Executes [action] on each data event of the stream.
* Completes the returned [Future] when all events of the stream
* have been processed. Completes the future with an error if the
* stream has an error event, or if [action] throws.
Future forEach(void action(T element)) {
_Future future = new _Future();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
() => action(element),
(_) {},
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
* Checks whether [test] accepts all elements provided by this stream.
* Completes the [Future] when the answer is known.
* If this stream reports an error, the [Future] will report that error.
Future<bool> every(bool test(T element)) {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
() => test(element),
(bool isMatch) {
if (!isMatch) {
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
* Checks whether [test] accepts any element provided by this stream.
* Completes the [Future] when the answer is known.
* If this stream reports an error, the [Future] will report that error.
Future<bool> any(bool test(T element)) {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(T element) {
() => test(element),
(bool isMatch) {
if (isMatch) {
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
/** Counts the elements in the stream. */
Future<int> get length {
_Future<int> future = new _Future<int>();
int count = 0;
(_) { count++; },
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
/** Reports whether this stream contains any elements. */
Future<bool> get isEmpty {
_Future<bool> future = new _Future<bool>();
StreamSubscription subscription;
subscription = this.listen(
(_) {
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
/** Collects the data of this stream in a [List]. */
Future<List<T>> toList() {
List<T> result = <T>[];
_Future<List<T>> future = new _Future<List<T>>();
(T data) {
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
/** Collects the data of this stream in a [Set]. */
Future<Set<T>> toSet() {
Set<T> result = new Set<T>();
_Future<Set<T>> future = new _Future<Set<T>>();
(T data) {
onError: future._completeError,
onDone: () {
cancelOnError: true);
return future;
* Discards all data on the stream, but signals when it's done or an error
* occured.
* When subscribing using [drain], cancelOnError will be true. This means
* that the future will complete with the first error on the stream and then
* cancel the subscription.
* In case of a `done` event the future completes with the given
* [futureValue].
Future drain([var futureValue]) => listen(null, cancelOnError: true)
* Provides at most the first [n] values of this stream.
* Forwards the first [n] data events of this stream, and all error
* events, to the returned stream, and ends with a done event.
* If this stream produces fewer than [count] values before it's done,
* so will the returned stream.
Stream<T> take(int count) {
return new _TakeStream(this, count);
* Forwards data events while [test] is successful.
* The returned stream provides the same events as this stream as long
* as [test] returns [:true:] for the event data. The stream is done
* when either this stream is done, or when this stream first provides
* a value that [test] doesn't accept.
Stream<T> takeWhile(bool test(T element)) {
return new _TakeWhileStream(this, test);
* Skips the first [count] data events from this stream.
Stream<T> skip(int count) {
return new _SkipStream(this, count);
* Skip data events from this stream while they are matched by [test].
* Error and done events are provided by the returned stream unmodified.
* Starting with the first data event where [test] returns false for the
* event data, the returned stream will have the same events as this stream.
Stream<T> skipWhile(bool test(T element)) {
return new _SkipWhileStream(this, test);
* Skips data events if they are equal to the previous data event.
* The returned stream provides the same events as this stream, except
* that it never provides two consequtive data events that are equal.
* Equality is determined by the provided [equals] method. If that is
* omitted, the '==' operator on the last provided data element is used.
Stream<T> distinct([bool equals(T previous, T next)]) {
return new _DistinctStream(this, equals);
* Returns the first element of the stream.
* Stops listening to the stream after the first element has been received.
* If an error event occurs before the first data event, the resulting future
* is completed with that error.
* If this stream is empty (a done event occurs before the first data event),
* the resulting future completes with a [StateError].
* Except for the type of the error, this method is equivalent to
* [:this.elementAt(0):].
Future<T> get first {
_Future<T> future = new _Future<T>();
StreamSubscription subscription;
subscription = this.listen(
(T value) {
onError: future._completeError,
onDone: () {
future._completeError(new StateError("No elements"));
cancelOnError: true);
return future;
* Returns the last element of the stream.
* If an error event occurs before the first data event, the resulting future
* is completed with that error.
* If this stream is empty (a done event occurs before the first data event),
* the resulting future completes with a [StateError].
Future<T> get last {
_Future<T> future = new _Future<T>();
T result = null;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
(T value) {
foundResult = true;
result = value;
onError: future._completeError,
onDone: () {
if (foundResult) {
future._completeError(new StateError("No elements"));
cancelOnError: true);
return future;
* Returns the single element.
* If [this] is empty or has more than one element throws a [StateError].
Future<T> get single {
_Future<T> future = new _Future<T>();
T result = null;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
(T value) {
if (foundResult) {
// This is the second element we get.
Error error = new StateError("More than one element");
foundResult = true;
result = value;
onError: future._completeError,
onDone: () {
if (foundResult) {
future._completeError(new StateError("No elements"));
cancelOnError: true);
return future;
* Finds the first element of this stream matching [test].
* Returns a future that is filled with the first element of this stream
* that [test] returns true for.
* If no such element is found before this stream is done, and a
* [defaultValue] function is provided, the result of calling [defaultValue]
* becomes the value of the future.
* If an error occurs, or if this stream ends without finding a match and
* with no [defaultValue] function provided, the future will receive an
* error.
Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
_Future<dynamic> future = new _Future();
StreamSubscription subscription;
subscription = this.listen(
(T value) {
() => test(value),
(bool isMatch) {
if (isMatch) {
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
if (defaultValue != null) {
_runUserCode(defaultValue, future._complete, future._completeError);
future._completeError(new StateError("firstMatch ended without match"));
cancelOnError: true);
return future;
* Finds the last element in this stream matching [test].
* As [firstWhere], except that the last matching element is found.
* That means that the result cannot be provided before this stream
* is done.
Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) {
_Future<dynamic> future = new _Future();
T result = null;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
(T value) {
() => true == test(value),
(bool isMatch) {
if (isMatch) {
foundResult = true;
result = value;
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
if (foundResult) {
if (defaultValue != null) {
_runUserCode(defaultValue, future._complete, future._completeError);
future._completeError(new StateError("lastMatch ended without match"));
cancelOnError: true);
return future;
* Finds the single element in this stream matching [test].
* Like [lastMatch], except that it is an error if more than one
* matching element occurs in the stream.
Future<T> singleWhere(bool test(T element)) {
_Future<T> future = new _Future<T>();
T result = null;
bool foundResult = false;
StreamSubscription subscription;
subscription = this.listen(
(T value) {
() => true == test(value),
(bool isMatch) {
if (isMatch) {
if (foundResult) {
new StateError('Multiple matches for "single"'));
foundResult = true;
result = value;
_cancelAndError(subscription, future)
onError: future._completeError,
onDone: () {
if (foundResult) {
future._completeError(new StateError("single ended without match"));
cancelOnError: true);
return future;
* Returns the value of the [index]th data event of this stream.
* Stops listening to the stream after a value has been found.
* If an error event occurs before the value is found, the future completes
* with this error.
* If a done event occurs before the value is found, the future completes
* with a [RangeError].
Future<T> elementAt(int index) {
if (index is! int || index < 0) throw new ArgumentError(index);
_Future<T> future = new _Future<T>();
StreamSubscription subscription;
subscription = this.listen(
(T value) {
if (index == 0) {
index -= 1;
onError: future._completeError,
onDone: () {
future._completeError(new RangeError.value(index));
cancelOnError: true);
return future;
* A control object for the subscription on a [Stream].
* When you subscribe on a [Stream] using [Stream.listen],
* a [StreamSubscription] object is returned. This object
* is used to later unsubscribe again, or to temporarily pause
* the stream's events.
abstract class StreamSubscription<T> {
* Cancels this subscription. It will no longer receive events.
* If an event is currently firing, this unsubscription will only
* take effect after all subscribers have received the current event.
void cancel();
/** Set or override the data event handler of this subscription. */
void onData(void handleData(T data));
* Set or override the error event handler of this subscription.
* This method overrides the handler that has been set at the invocation of
* [Stream.listen].
void onError(Function handleError);
/** Set or override the done event handler of this subscription. */
void onDone(void handleDone());
* Request that the stream pauses events until further notice.
* If [resumeSignal] is provided, the stream will undo the pause
* when the future completes. If the future completes with an error,
* it will not be handled!
* A call to [resume] will also undo a pause.
* If the subscription is paused more than once, an equal number
* of resumes must be performed to resume the stream.
void pause([Future resumeSignal]);
* Resume after a pause.
void resume();
* Returns true if the [StreamSubscription] is paused.
bool get isPaused;
* Returns a future that handles the [onDone] and [onError] callbacks.
* This method *overwrites* the existing [onDone] and [onError] callbacks
* with new ones that complete the returned future.
* In case of an error the subscription will automatically cancel (even
* when it was listening with `cancelOnError` set to `false`).
* In case of a `done` event the future completes with the given
* [futureValue].
Future asFuture([var futureValue]);
* An interface that abstracts creation or handling of [Stream] events.
abstract class EventSink<T> {
/** Create a data event */
void add(T event);
/** Create an async error. */
void addError(errorEvent, [StackTrace stackTrace]);
/** Request a stream to close. */
void close();
/** [Stream] wrapper that only exposes the [Stream] interface. */
class StreamView<T> extends Stream<T> {
Stream<T> _stream;
bool get isBroadcast => _stream.isBroadcast;
Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription),
void onCancel(StreamSubscription subscription)})
=> _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
StreamSubscription<T> listen(void onData(T value),
{ Function onError,
void onDone(),
bool cancelOnError }) {
return _stream.listen(onData, onError: onError, onDone: onDone,
cancelOnError: cancelOnError);
* The target of a [Stream.pipe] call.
* The [Stream.pipe] call will pass itself to this object, and then return
* the resulting [Future]. The pipe should complete the future when it's
* done.
abstract class StreamConsumer<S> {
Future addStream(Stream<S> stream);
Future close();
* A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
* the synchronous methods from [EventSink].
* The [EventSink] methods can't be used while the [addStream] is called.
* As soon as the [addStream]'s [Future] completes with a value, the
* [EventSink] methods can be used again.
* If [addStream] is called after any of the [EventSink] methods, it'll
* be delayed until the underlying system has consumed the data added by the
* [EventSink] methods.
* When [EventSink] methods are used, the [done] [Future] can be used to
* catch any errors.
* When [close] is called, it will return the [done] [Future].
abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> {
* Close the [StreamSink]. It'll return the [done] Future.
Future close();
* The [done] Future completes with the same values as [close], except
* for the following case:
* * The synchronous methods of [EventSink] were called, resulting in an
* error. If there is no active future (like from an addStream call), the
* [done] future will complete with that error
Future get done;
* The target of a [Stream.transform] call.
* The [Stream.transform] call will pass itself to this object and then return
* the resulting stream.
* It is good practice to write transformers that can be used multiple times.
abstract class StreamTransformer<S, T> {
* Creates a [StreamTransformer].
* The returned instance takes responsibility of implementing ([bind]).
* When the user invokes `bind` it returns a new "bound" stream. Only when
* the user starts listening to the bound stream, the `listen` method
* invokes the given closure [transformer].
* The [transformer] closure receives the stream, that was bound, as argument
* and returns a [StreamSubscription]. In almost all cases the closure
* listens itself to the stream that is given as argument.
* The result of invoking the [transformer] closure is a [StreamSubscription].
* The bound stream-transformer (created by the `bind` method above) then sets
* the handlers it received as part of the `listen` call.
* Conceptually this can be summarized as follows:
* 1. `var transformer = new StreamTransformer(transformerClosure);`
* creates a `StreamTransformer` that supports the `bind` method.
* 2. `var boundStream = stream.transform(transformer);` binds the `stream`
* and returns a bound stream that has a pointer to `stream`.
* 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
* starts the listening and transformation. This is accomplished
* in 2 steps: first the `boundStream` invokes the `transformerClosure` with
* the `stream` it captured: `transformerClosure(stream, b)`.
* The result `subscription`, a [StreamSubscription], is then
* updated to receive its handlers: `subscription.onData(f1)`,
* `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
* is returned as result of the `listen` call.
* There are two common ways to create a StreamSubscription:
* 1. by creating a new class that implements [StreamSubscription].
* Note that the subscription should run callbacks in the [Zone] the
* stream was listened to.
* 2. by allocating a [StreamController] and to return the result of
* listening to its stream.
* Example use of a duplicating transformer:
* stringStream.transform(new StreamTransformer<String, String>(
* (Stream<String> input, bool cancelOnError) {
* StreamController<String> controller;
* StreamSubscription<String> subscription;
* controller = new StreamController<String>(
* onListen: () {
* subscription = input.listen((data) {
* // Duplicate the data.
* controller.add(data);
* controller.add(data);
* },
* onError: controller.addError,
* onDone: controller.close,
* cancelOnError: cancelOnError);
* },
* onPause: subscription.pause,
* onResume: subscription.resume,
* onCancel: subscription.cancel,
* sync: true);
* return;
* });
const factory StreamTransformer(
StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
= _StreamSubscriptionTransformer;
* Creates a [StreamTransformer] that delegates events to the given functions.
* Example use of a duplicating transformer:
* stringStream.transform(new StreamTransformer<String, String>.fromHandlers(
* handleData: (String value, EventSink<String> sink) {
* sink.add(value);
* sink.add(value); // Duplicate the incoming events.
* }));
factory StreamTransformer.fromHandlers({
void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)})
= _StreamHandlerTransformer;
Stream<T> bind(Stream<S> stream);
* An [Iterable] like interface for the values of a [Stream].
* This wraps a [Stream] and a subscription on the stream. It listens
* on the stream, and completes the future returned by [moveNext] when the
* next value becomes available.
* NOTICE: This is a tentative design. This class may change.
abstract class StreamIterator<T> {
/** Create a [StreamIterator] on [stream]. */
factory StreamIterator(Stream<T> stream)
// TODO(lrn): use redirecting factory constructor when type
// arguments are supported.
=> new _StreamIteratorImpl<T>(stream);
* Wait for the next stream value to be available.
* It is not allowed to call this function again until the future has
* completed. If the returned future completes with anything except `true`,
* the iterator is done, and no new value will ever be available.
* The future may complete with an error, if the stream produces an error.
Future<bool> moveNext();
* The current value of the stream.
* Only valid when the future returned by [moveNext] completes with `true`
* as value, and only until the next call to [moveNext].
T get current;
* Cancels the stream iterator (and the underlying stream subscription) early.
* The stream iterator is automatically canceled if the [moveNext] future
* completes with either `false` or an error.
* If a [moveNext] call has been made, it will complete with `false` as value,
* as will all further calls to [moveNext].
* If you need to stop listening for values before the stream iterator is
* automatically closed, you must call [cancel] to ensure that the stream
* is properly closed.
void cancel();