| // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| part of dart.async; |
| |
| // ------------------------------------------------------------------- |
| // Core Stream types |
| // ------------------------------------------------------------------- |
| |
| typedef void _TimerCallback(); |
| |
| /// A source of asynchronous data events. |
| /// |
| /// A Stream provides a way to receive a sequence of events. |
| /// Each event is either a data event, also called an *element* of the stream, |
| /// or an error event, which is a notification that something has failed. |
| /// When a stream has emitted all its event, |
| /// a single "done" event will notify the listener that the end has been reached. |
| /// |
| /// You produce a stream by calling an `async*` function, which then returns |
| /// a stream. Consuming that stream will lead the function to emit events |
| /// until it ends, and the stream closes. |
| /// You consume a stream either using an `await for` loop, which is available |
| /// inside an `async` or `async*` function, or forwards its events directly |
| /// using `yield*` inside an `async*` function. |
| /// Example: |
| /// ```dart |
| /// Stream<T> optionalMap<T>( |
| /// Stream<T> source , [T Function(T)? convert]) async* { |
| /// if (convert == null) { |
| /// yield* source; |
| /// } else { |
| /// await for (var event in source) { |
| /// yield convert(event); |
| /// } |
| /// } |
| /// } |
| /// ``` |
| /// When this function is called, it immediately returns a `Stream<T>` object. |
| /// Then nothing further happens until someone tries to consume that stream. |
| /// At that point, the body of the `async*` function starts running. |
| /// If the `convert` function was omitted, the `yield*` will listen to the |
| /// `source` stream and forward all events, date and errors, to the returned |
| /// stream. When the `source` stream closes, the `yield*` is done, |
| /// and the `optionalMap` function body ends too. This closes the returned |
| /// stream. |
| /// If a `convert` *is* supplied, the function instead listens on the source |
| /// stream and enters an `await for` loop which |
| /// repeatedly waits for the next data event. |
| /// On a data event, it calls `convert` with the value and emits the result |
| /// on the returned stream. |
| /// If no error events are emitted by the `source` stream, |
| /// the loop ends when the `source` stream does, |
| /// then the `optionalMap` function body completes, |
| /// which closes the returned stream. |
| /// On an error event from the `source` stream, |
| /// the `await for` that error is (re-)thrown which breaks the loop. |
| /// The error then reaches the end of the `optionalMap` function body, |
| /// since it's not caught. |
| /// That makes the error be emitted on the returned stream, which then closes. |
| /// |
| /// The `Stream` class also provides functionality which allows you to |
| /// manually listen for events from a stream, or to convert a stream |
| /// into another stream or into a future. |
| /// |
| /// The [forEach] function corresponds to the `await for` loop, |
| /// just as [Iterable.forEach] corresponds to a normal `for`/`in` loop. |
| /// Like the loop, it will call a function for each data event and break on an |
| /// error. |
| /// |
| /// The more low-level [listen] method is what every other method is based on. |
| /// You call `listen` on a stream to tell it that you want to receive |
| /// events, and to registers the callbacks which will receive those events. |
| /// When you call `listen`, you receive a [StreamSubscription] object |
| /// which is the active object providing the events, |
| /// and which can be used to stop listening again, |
| /// or to temporarily pause events from the subscription. |
| /// |
| /// There are two kinds of streams: "Single-subscription" streams and |
| /// "broadcast" streams. |
| /// |
| /// *A single-subscription stream* allows only a single listener during the whole |
| /// lifetime of the stream. |
| /// It doesn't start generating events until it has a listener, |
| /// and it stops sending events when the listener is unsubscribed, |
| /// even if the source of events could still provide more. |
| /// The stream created by an `async*` function is a single-subscription stream, |
| /// but each call to the function creates a new such stream. |
| /// |
| /// Listening twice on a single-subscription stream is not allowed, even after |
| /// the first subscription has been canceled. |
| /// |
| /// Single-subscription streams are generally used for streaming chunks of |
| /// larger contiguous data, like file I/O. |
| /// |
| /// *A broadcast stream* allows any number of listeners, and it fires |
| /// its events when they are ready, whether there are listeners or not. |
| /// |
| /// Broadcast streams are used for independent events/observers. |
| /// |
| /// If several listeners want to listen to a single-subscription stream, |
| /// use [asBroadcastStream] to create a broadcast stream on top of the |
| /// non-broadcast stream. |
| /// |
| /// On either kind of stream, stream transformations, such as [where] and |
| /// [skip], return the same type of stream as the one the method was called on, |
| /// unless otherwise noted. |
| /// |
| /// When an event is fired, the listener(s) at that time will receive the event. |
| /// If a listener is added to a broadcast stream while an event is being fired, |
| /// that listener will not receive the event currently being fired. |
| /// If a listener is canceled, it immediately stops receiving events. |
| /// Listening on a broadcast stream can be treated as listening on a new stream |
| /// containing only the events that have not yet been emitted when the [listen] |
| /// call occurs. |
| /// For example, the [first] getter listens to the stream, then returns the first |
| /// event that listener receives. |
| /// This is not necessarily the first even emitted by the stream, but the first |
| /// of the *remaining* events of the broadcast stream. |
| /// |
| /// When the "done" event is fired, subscribers are unsubscribed before |
| /// receiving the event. After the event has been sent, the stream has no |
| /// subscribers. Adding new subscribers to a broadcast stream after this point |
| /// is allowed, but they will just receive a new "done" event as soon |
| /// as possible. |
| /// |
| /// Stream subscriptions always respect "pause" requests. If necessary they need |
| /// to buffer their input, but often, and preferably, they can simply request |
| /// their input to pause too. |
| /// |
| /// The default implementation of [isBroadcast] returns false. |
| /// A broadcast stream inheriting from [Stream] must override [isBroadcast] |
| /// to return `true` if it wants to signal that it behaves like a broadcast |
| /// stream. |
| abstract class Stream<T> { |
| Stream(); |
| |
| /// Internal use only. We do not want to promise that Stream stays const. |
| /// |
| /// If mixins become compatible with const constructors, we may use a |
| /// stream mixin instead of extending Stream from a const class. |
| /// (They now are compatible. We still consider, but it's not urgent.) |
| const Stream._internal(); |
| |
| /// Creates an empty broadcast stream. |
| /// |
| /// This is a stream which does nothing except sending a done event |
| /// when it's listened to. |
| const factory Stream.empty() = _EmptyStream<T>; |
| |
| /// Creates a stream which emits a single data event before closing. |
| /// |
| /// This stream emits a single data event of [value] |
| /// and then closes with a done event. |
| /// |
| /// Example: |
| /// ```dart |
| /// Future<void> printThings(Stream<String> data) async { |
| /// await for (var x in data) { |
| /// print(x); |
| /// } |
| /// } |
| /// printThings(Stream<String>.value("ok")); // prints "ok". |
| /// ``` |
| /// |
| /// The returned stream is effectively equivalent to one created by |
| /// `(() async* { yield value; } ())` or `Future<T>.value(value).asStream()`. |
| @Since("2.5") |
| factory Stream.value(T value) => |
| (_AsyncStreamController<T>(null, null, null, null) |
| .._add(value) |
| .._closeUnchecked()) |
| .stream; |
| |
| /// Creates a stream which emits a single error event before completing. |
| /// |
| /// This stream emits a single error event of [error] and [stackTrace] |
| /// and then completes with a done event. |
| /// |
| /// Example: |
| /// ```dart |
| /// Future<void> tryThings(Stream<int> data) async { |
| /// try { |
| /// await for (var x in data) { |
| /// print("Data: $x"); |
| /// } |
| /// } catch (e) { |
| /// print(e); |
| /// } |
| /// } |
| /// tryThings(Stream<int>.error("Error")); // prints "Error". |
| /// ``` |
| /// The returned stream is effectively equivalent to one created by |
| /// `Future<T>.error(error, stackTrace).asStream()`, by or |
| /// `(() async* { throw error; } ())`, except that you can control the |
| /// stack trace as well. |
| @Since("2.5") |
| factory Stream.error(Object error, [StackTrace? stackTrace]) { |
| // TODO(40614): Remove once non-nullability is sound. |
| checkNotNullable(error, "error"); |
| return (_AsyncStreamController<T>(null, null, null, null) |
| .._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error)) |
| .._closeUnchecked()) |
| .stream; |
| } |
| |
| /// Creates a new single-subscription stream from the future. |
| /// |
| /// When the future completes, the stream will fire one event, either |
| /// data or error, and then close with a done-event. |
| factory Stream.fromFuture(Future<T> future) { |
| // Use the controller's buffering to fill in the value even before |
| // the stream has a listener. For a single value, it's not worth it |
| // to wait for a listener before doing the `then` on the future. |
| _StreamController<T> controller = |
| new _SyncStreamController<T>(null, null, null, null); |
| future.then((value) { |
| controller._add(value); |
| controller._closeUnchecked(); |
| }, onError: (error, stackTrace) { |
| controller._addError(error, stackTrace); |
| controller._closeUnchecked(); |
| }); |
| return controller.stream; |
| } |
| |
| /// Create a single-subscription stream from a group of futures. |
| /// |
| /// The stream reports the results of the futures on the stream in the order |
| /// in which the futures complete. |
| /// Each future provides either a data event or an error event, |
| /// depending on how the future completes. |
| /// |
| /// If some futures have already completed when `Stream.fromFutures` is called, |
| /// their results will be emitted in some unspecified order. |
| /// |
| /// When all futures have completed, the stream is closed. |
| /// |
| /// If [futures] is empty, the stream closes as soon as possible. |
| factory Stream.fromFutures(Iterable<Future<T>> futures) { |
| _StreamController<T> controller = |
| new _SyncStreamController<T>(null, null, null, null); |
| int count = 0; |
| // Declare these as variables holding closures instead of as |
| // function declarations. |
| // This avoids creating a new closure from the functions for each future. |
| void onValue(T value) { |
| if (!controller.isClosed) { |
| controller._add(value); |
| if (--count == 0) controller._closeUnchecked(); |
| } |
| } |
| |
| void onError(Object error, StackTrace stack) { |
| if (!controller.isClosed) { |
| controller._addError(error, stack); |
| if (--count == 0) controller._closeUnchecked(); |
| } |
| } |
| |
| // The futures are already running, so start listening to them immediately |
| // (instead of waiting for the stream to be listened on). |
| // If we wait, we might not catch errors in the futures in time. |
| for (var future in futures) { |
| count++; |
| future.then(onValue, onError: onError); |
| } |
| // Use schedule microtask since controller is sync. |
| if (count == 0) scheduleMicrotask(controller.close); |
| return controller.stream; |
| } |
| |
| /// Creates a single-subscription stream that gets its data from [elements]. |
| /// |
| /// The iterable is iterated when the stream receives a listener, and stops |
| /// iterating if the listener cancels the subscription, or if the |
| /// [Iterator.moveNext] method returns `false` or throws. |
| /// Iteration is suspended while the stream subscription is paused. |
| /// |
| /// If calling [Iterator.moveNext] on `elements.iterator` throws, |
| /// the stream emits that error and then it closes. |
| /// If reading [Iterator.current] on `elements.iterator` throws, |
| /// the stream emits that error, but keeps iterating. |
| factory Stream.fromIterable(Iterable<T> elements) { |
| return new _GeneratedStreamImpl<T>( |
| () => new _IterablePendingEvents<T>(elements)); |
| } |
| |
| /// Creates a multi-subscription stream. |
| /// |
| /// Each time the created stream is listened to, |
| /// the [onListen] callback is invoked with a new [MultiStreamController] |
| /// which forwards events to the [StreamSubscription] |
| /// returned by that [listen] call. |
| /// |
| /// This allows each listener to be treated as an individual stream. |
| /// |
| /// The [MultiStreamController] does not support reading its |
| /// [StreamController.stream]. Setting its [StreamController.onListen] |
| /// has no effect since the [onListen] callback is called instead, |
| /// and the [StreamController.onListen] won't be called later. |
| /// The controller acts like an asynchronous controller, |
| /// but provides extra methods for delivering events synchronously. |
| /// |
| /// If [isBroadcast] is set to `true`, the returned stream's |
| /// [Stream.isBroadcast] will be `true`. |
| /// This has no effect on the stream behavior, |
| /// it is up to the [onListen] function |
| /// to act like a broadcast stream if it claims to be one. |
| /// |
| /// A multi-subscription stream can behave like any other stream. |
| /// If the [onListen] callback throws on every call after the first, |
| /// the stream behaves like a single-subscription stream. |
| /// If the stream emits the same events to all current listeners, |
| /// it behaves like a broadcast stream. |
| /// |
| /// It can also choose to emit different events to different listeners. |
| /// For example, a stream which repeats the most recent |
| /// non-`null` event to new listeners, could be implemented as this example: |
| /// ```dart |
| /// extension StreamRepeatLatestExtension<T extends Object> on Stream<T> { |
| /// Stream<T> repeatLatest() { |
| /// var done = false; |
| /// T? latest = null; |
| /// var currentListeners = <MultiStreamController<T>>{}; |
| /// this.listen((event) { |
| /// latest = event; |
| /// for (var listener in [...currentListeners]) listener.addSync(event); |
| /// }, onError: (Object error, StackTrace stack) { |
| /// for (var listener in [...currentListeners]) listener.addErrorSync(error, stack); |
| /// }, onDone: () { |
| /// done = true; |
| /// latest = null; |
| /// for (var listener in currentListeners) listener.closeSync(); |
| /// currentListeners.clear(); |
| /// }); |
| /// return Stream.multi((controller) { |
| /// if (done) { |
| /// controller.close(); |
| /// return; |
| /// } |
| /// currentListeners.add(controller); |
| /// var latestValue = latest; |
| /// if (latestValue != null) controller.add(latestValue); |
| /// controller.onCancel = () { |
| /// currentListeners.remove(controller); |
| /// }; |
| /// }); |
| /// } |
| /// } |
| /// ``` |
| @Since("2.9") |
| factory Stream.multi(void Function(MultiStreamController<T>) onListen, |
| {bool isBroadcast = false}) { |
| return _MultiStream<T>(onListen, isBroadcast); |
| } |
| |
| /// Creates a stream that repeatedly emits events at [period] intervals. |
| /// |
| /// The event values are computed by invoking [computation]. The argument to |
| /// this callback is an integer that starts with 0 and is incremented for |
| /// every event. |
| /// |
| /// The [period] must a non-negative [Duration]. |
| /// |
| /// If [computation] is omitted the event values will all be `null`. |
| /// |
| /// The [computation] must not be omitted if the event type [T] does not |
| /// allow `null` as a value. |
| factory Stream.periodic(Duration period, |
| [T computation(int computationCount)?]) { |
| if (computation == null && !typeAcceptsNull<T>()) { |
| throw ArgumentError.value(null, "computation", |
| "Must not be omitted when the event type is non-nullable"); |
| } |
| var controller = _SyncStreamController<T>(null, null, null, null); |
| // Counts the time that the Stream was running (and not paused). |
| Stopwatch watch = new Stopwatch(); |
| controller.onListen = () { |
| int computationCount = 0; |
| void sendEvent(_) { |
| watch.reset(); |
| if (computation != null) { |
| T event; |
| try { |
| event = computation(computationCount++); |
| } catch (e, s) { |
| controller.addError(e, s); |
| return; |
| } |
| controller.add(event); |
| } else { |
| controller.add(null as T); // We have checked that null is T. |
| } |
| } |
| |
| Timer timer = Timer.periodic(period, sendEvent); |
| controller |
| ..onCancel = () { |
| timer.cancel(); |
| return Future._nullFuture; |
| } |
| ..onPause = () { |
| watch.stop(); |
| timer.cancel(); |
| } |
| ..onResume = () { |
| Duration elapsed = watch.elapsed; |
| watch.start(); |
| timer = new Timer(period - elapsed, () { |
| timer = Timer.periodic(period, sendEvent); |
| sendEvent(null); |
| }); |
| }; |
| }; |
| return controller.stream; |
| } |
| |
| /// Creates a stream where all events of an existing stream are piped through |
| /// a sink-transformation. |
| /// |
| /// The given [mapSink] closure is invoked when the returned stream is |
| /// listened to. All events from the [source] are added into the event sink |
| /// that is returned from the invocation. The transformation puts all |
| /// transformed events into the sink the [mapSink] closure received during |
| /// its invocation. Conceptually the [mapSink] creates a transformation pipe |
| /// with the input sink being the returned [EventSink] and the output sink |
| /// being the sink it received. |
| /// |
| /// This constructor is frequently used to build transformers. |
| /// |
| /// Example use for a duplicating transformer: |
| /// ```dart |
| /// class DuplicationSink implements EventSink<String> { |
| /// final EventSink<String> _outputSink; |
| /// DuplicationSink(this._outputSink); |
| /// |
| /// void add(String data) { |
| /// _outputSink.add(data); |
| /// _outputSink.add(data); |
| /// } |
| /// |
| /// void addError(e, [st]) { _outputSink.addError(e, st); } |
| /// void close() { _outputSink.close(); } |
| /// } |
| /// |
| /// class DuplicationTransformer extends StreamTransformerBase<String, String> { |
| /// // Some generic types omitted for brevity. |
| /// Stream bind(Stream stream) => Stream<String>.eventTransformed( |
| /// stream, |
| /// (EventSink sink) => DuplicationSink(sink)); |
| /// } |
| /// |
| /// stringStream.transform(DuplicationTransformer()); |
| /// ``` |
| /// The resulting stream is a broadcast stream if [source] is. |
| factory Stream.eventTransformed( |
| Stream<dynamic> source, EventSink<dynamic> mapSink(EventSink<T> sink)) { |
| return new _BoundSinkStream(source, mapSink); |
| } |
| |
| /// Adapts [source] to be a `Stream<T>`. |
| /// |
| /// This allows [source] to be used at the new type, but at run-time it |
| /// must satisfy the requirements of both the new type and its original type. |
| /// |
| /// Data events created by the source stream must also be instances of [T]. |
| static Stream<T> castFrom<S, T>(Stream<S> source) => |
| new CastStream<S, T>(source); |
| |
| /// Whether this stream is a broadcast stream. |
| bool get isBroadcast => false; |
| |
| /// Returns a multi-subscription stream that produces the same events as this. |
| /// |
| /// The returned stream will subscribe to this stream when its first |
| /// subscriber is added, and will stay subscribed until this stream ends, |
| /// or a callback cancels the subscription. |
| /// |
| /// If [onListen] is provided, it is called with a subscription-like object |
| /// that represents the underlying subscription to this stream. It is |
| /// possible to pause, resume or cancel the subscription during the call |
| /// to [onListen]. It is not possible to change the event handlers, including |
| /// using [StreamSubscription.asFuture]. |
| /// |
| /// If [onCancel] is provided, it is called in a similar way to [onListen] |
| /// when the returned stream stops having listener. If it later gets |
| /// a new listener, the [onListen] function is called again. |
| /// |
| /// Use the callbacks, for example, for pausing the underlying subscription |
| /// while having no subscribers to prevent losing events, or canceling the |
| /// subscription when there are no listeners. |
| /// |
| /// Cancelling is intended to be used when there are no current subscribers. |
| /// If the subscription passed to `onListen` or `onCancel` is cancelled, |
| /// then no further events are ever emitted by current subscriptions on |
| /// the returned broadcast stream, not even a done event. |
| Stream<T> asBroadcastStream( |
| {void onListen(StreamSubscription<T> subscription)?, |
| void onCancel(StreamSubscription<T> subscription)?}) { |
| return new _AsBroadcastStream<T>(this, onListen, onCancel); |
| } |
| |
| /// Adds a subscription to this stream. |
| /// |
| /// Returns a [StreamSubscription] which handles events from this stream using |
| /// the provided [onData], [onError] and [onDone] handlers. |
| /// The handlers can be changed on the subscription, but they start out |
| /// as the provided functions. |
| /// |
| /// On each data event from this stream, the subscriber's [onData] handler |
| /// is called. If [onData] is `null`, nothing happens. |
| /// |
| /// On errors from this stream, the [onError] handler is called with the |
| /// error object and possibly a stack trace. |
| /// |
| /// The [onError] callback must be of type `void Function(Object error)` or |
| /// `void Function(Object error, StackTrace)`. |
| /// The function type determines whether [onError] is invoked with a stack |
| /// trace argument. |
| /// The stack trace argument may be [StackTrace.empty] if this stream received |
| /// an error without a stack trace. |
| /// |
| /// Otherwise it is called with just the error object. |
| /// If [onError] is omitted, any errors on this stream are considered unhandled, |
| /// and will be passed to the current [Zone]'s error handler. |
| /// By default unhandled async errors are treated |
| /// as if they were uncaught top-level errors. |
| /// |
| /// If this stream closes and sends a done event, the [onDone] handler is |
| /// called. If [onDone] is `null`, nothing happens. |
| /// |
| /// If [cancelOnError] is `true`, the subscription is automatically canceled |
| /// when the first error event is delivered. The default is `false`. |
| /// |
| /// While a subscription is paused, or when it has been canceled, |
| /// the subscription doesn't receive events and none of the |
| /// event handler functions are called. |
| StreamSubscription<T> listen(void onData(T event)?, |
| {Function? onError, void onDone()?, bool? cancelOnError}); |
| |
| /// Creates a new stream from this stream that discards some elements. |
| /// |
| /// The new stream sends the same error and done events as this stream, |
| /// but it only sends the data events that satisfy the [test]. |
| /// |
| /// If the [test] function throws, the data event is dropped and the |
| /// error is emitted on the returned stream instead. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// If a broadcast stream is listened to more than once, each subscription |
| /// will individually perform the `test`. |
| Stream<T> where(bool test(T event)) { |
| return new _WhereStream<T>(this, test); |
| } |
| |
| /// Transforms each element of this stream into a new stream event. |
| /// |
| /// Creates a new stream that converts each element of this stream |
| /// to a new value using the [convert] function, and emits the result. |
| /// |
| /// For each data event, `o`, in this stream, the returned stream |
| /// provides a data event with the value `convert(o)`. |
| /// If [convert] throws, the returned stream reports it as an error |
| /// event instead. |
| /// |
| /// Error and done events are passed through unchanged to the returned stream. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// The [convert] function is called once per data event per listener. |
| /// If a broadcast stream is listened to more than once, each subscription |
| /// will individually call [convert] on each data event. |
| /// |
| /// Unlike [transform], this method does not treat the stream as |
| /// chunks of a single value. Instead each event is converted independently |
| /// of the previous and following events, which may not always be correct. |
| /// For example, UTF-8 encoding, or decoding, will give wrong results |
| /// if a surrogate pair, or a multibyte UTF-8 encoding, is split into |
| /// separate events, and those events are attempted encoded or decoded |
| /// independently. |
| Stream<S> map<S>(S convert(T event)) { |
| return new _MapStream<T, S>(this, convert); |
| } |
| |
| /// Creates a new stream with each data event of this stream asynchronously |
| /// mapped to a new event. |
| /// |
| /// This acts like [map], except that [convert] may return a [Future], |
| /// and in that case, this stream waits for that future to complete before |
| /// continuing with its result. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { |
| _StreamControllerBase<E> controller; |
| if (isBroadcast) { |
| controller = _SyncBroadcastStreamController<E>(null, null); |
| } else { |
| controller = _SyncStreamController<E>(null, null, null, null); |
| } |
| |
| controller.onListen = () { |
| StreamSubscription<T> subscription = this.listen(null, |
| onError: controller._addError, // Avoid Zone error replacement. |
| onDone: controller.close); |
| FutureOr<Null> add(E value) { |
| controller.add(value); |
| } |
| |
| final addError = controller._addError; |
| final resume = subscription.resume; |
| subscription.onData((T event) { |
| FutureOr<E> newValue; |
| try { |
| newValue = convert(event); |
| } catch (e, s) { |
| controller.addError(e, s); |
| return; |
| } |
| if (newValue is Future<E>) { |
| subscription.pause(); |
| newValue.then(add, onError: addError).whenComplete(resume); |
| } else { |
| // TODO(40014): Remove cast when type promotion works. |
| controller.add(newValue as dynamic); |
| } |
| }); |
| controller.onCancel = subscription.cancel; |
| if (!isBroadcast) { |
| controller |
| ..onPause = subscription.pause |
| ..onResume = resume; |
| } |
| }; |
| return controller.stream; |
| } |
| |
| /// Transforms each element into a sequence of asynchronous events. |
| /// |
| /// Returns a new stream and for each event of this stream, do the following: |
| /// |
| /// * If the event is an error event or a done event, it is emitted directly |
| /// by the returned stream. |
| /// * Otherwise it is an element. Then the [convert] function is called |
| /// with the element as argument to produce a convert-stream for the element. |
| /// * If that call throws, the error is emitted on the returned stream. |
| /// * If the call returns `null`, no further action is taken for the elements. |
| /// * Otherwise, this stream is paused and convert-stream is listened to. |
| /// Every data and error event of the convert-stream is emitted on the returned |
| /// stream in the order it is produced. |
| /// When the convert-stream ends, this stream is resumed. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { |
| _StreamControllerBase<E> controller; |
| if (isBroadcast) { |
| controller = _SyncBroadcastStreamController<E>(null, null); |
| } else { |
| controller = _SyncStreamController<E>(null, null, null, null); |
| } |
| |
| controller.onListen = () { |
| StreamSubscription<T> subscription = this.listen(null, |
| onError: controller._addError, // Avoid Zone error replacement. |
| onDone: controller.close); |
| subscription.onData((T event) { |
| Stream<E>? newStream; |
| try { |
| newStream = convert(event); |
| } catch (e, s) { |
| controller.addError(e, s); |
| return; |
| } |
| if (newStream != null) { |
| subscription.pause(); |
| controller.addStream(newStream).whenComplete(subscription.resume); |
| } |
| }); |
| controller.onCancel = subscription.cancel; |
| if (!isBroadcast) { |
| controller |
| ..onPause = subscription.pause |
| ..onResume = subscription.resume; |
| } |
| }; |
| return controller.stream; |
| } |
| |
| /// Creates a wrapper Stream that intercepts some errors from this stream. |
| /// |
| /// If this stream sends an error that matches [test], then it is intercepted |
| /// by the [onError] function. |
| /// |
| /// The [onError] callback must be of type `void Function(Object error)` or |
| /// `void Function(Object error, StackTrace)`. |
| /// The function type determines whether [onError] is invoked with a stack |
| /// trace argument. |
| /// The stack trace argument may be [StackTrace.empty] if this stream received |
| /// an error without a stack trace. |
| /// |
| /// An asynchronous error `error` is matched by a test function if |
| ///`test(error)` returns true. If [test] is omitted, every error is considered |
| /// matching. |
| /// |
| /// If the error is intercepted, the [onError] function can decide what to do |
| /// with it. It can throw if it wants to raise a new (or the same) error, |
| /// or simply return to make this stream forget the error. |
| /// If the received `error` value is thrown again by the [onError] function, |
| /// it acts like a `rethrow` and it is emitted along with its original |
| /// stack trace, not the stack trace of the `throw` inside [onError]. |
| /// |
| /// If you need to transform an error into a data event, use the more generic |
| /// [Stream.transform] to handle the event by writing a data event to |
| /// the output sink. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// If a broadcast stream is listened to more than once, each subscription |
| /// will individually perform the `test` and handle the error. |
| Stream<T> handleError(Function onError, {bool test(error)?}) { |
| if (onError is! void Function(Object, StackTrace) && |
| onError is! void Function(Object)) { |
| throw ArgumentError.value( |
| onError, |
| "onError", |
| "Error handler must accept one Object or one Object and a StackTrace" |
| " as arguments."); |
| } |
| return new _HandleErrorStream<T>(this, onError, test); |
| } |
| |
| /// Transforms each element of this stream into a sequence of elements. |
| /// |
| /// Returns a new stream where each element of this stream is replaced |
| /// by zero or more data events. |
| /// The event values are provided as an [Iterable] by a call to [convert] |
| /// with the element as argument, and the elements of that iterable is |
| /// emitted in iteration order. |
| /// If calling [convert] throws, or if the iteration of the returned values |
| /// throws, the error is emitted on the returned stream and iteration ends |
| /// for that element of this stream. |
| /// |
| /// Error events and the done event of this stream are forwarded directly |
| /// to the returned stream. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// If a broadcast stream is listened to more than once, each subscription |
| /// will individually call `convert` and expand the events. |
| Stream<S> expand<S>(Iterable<S> convert(T element)) { |
| return new _ExpandStream<T, S>(this, convert); |
| } |
| |
| /// Pipes the events of this stream into [streamConsumer]. |
| /// |
| /// All events of this stream are added to `streamConsumer` using |
| /// [StreamConsumer.addStream]. |
| /// The `streamConsumer` is closed when this stream has been successfully added |
| /// to it - when the future returned by `addStream` completes without an error. |
| /// |
| /// Returns a future which completes when this stream has been consumed |
| /// and the consumer has been closed. |
| /// |
| /// The returned future completes with the same result as the future returned |
| /// by [StreamConsumer.close]. |
| /// If the call to [StreamConsumer.addStream] fails in some way, this |
| /// method fails in the same way. |
| Future pipe(StreamConsumer<T> streamConsumer) { |
| return streamConsumer.addStream(this).then((_) => streamConsumer.close()); |
| } |
| |
| /// Applies [streamTransformer] to this stream. |
| /// |
| /// Returns the transformed stream, |
| /// that is, the result of `streamTransformer.bind(this)`. |
| /// This method simply allows writing the call to `streamTransformer.bind` |
| /// in a chained fashion, like |
| /// ```dart |
| /// stream.map(mapping).transform(transformation).toList() |
| /// ``` |
| /// which can be more convenient than calling `bind` directly. |
| /// |
| /// The [streamTransformer] can return any stream. |
| /// Whether the returned stream is a broadcast stream or not, |
| /// and which elements it will contain, |
| /// is entirely up to the transformation. |
| /// |
| /// This method should always be used for transformations which treat |
| /// the entire stream as representing a single value |
| /// which has perhaps been split into several parts for transport, |
| /// like a file being read from disk or being fetched over a network. |
| /// The transformation will then produce a new stream which |
| /// transforms the stream's value incrementally (perhaps using |
| /// [Converter.startChunkedConversion]). The resulting stream |
| /// may again be chunks of the result, but does not have to |
| /// correspond to specific events from the source string. |
| Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
| return streamTransformer.bind(this); |
| } |
| |
| /// Combines a sequence of values by repeatedly applying [combine]. |
| /// |
| /// Similar to [Iterable.reduce], this function maintains a value, |
| /// starting with the first element of this stream |
| /// and updated for each further element of this stream. |
| /// For each element after the first, |
| /// the value is updated to the result of calling [combine] |
| /// with the previous value and the element. |
| /// |
| /// When this stream is done, the returned future is completed with |
| /// the value at that time. |
| /// |
| /// If this stream is empty, the returned future is completed with |
| /// an error. |
| /// If this stream emits an error, or the call to [combine] throws, |
| /// the returned future is completed with that error, |
| /// and processing is stopped. |
| Future<T> reduce(T combine(T previous, T element)) { |
| _Future<T> result = new _Future<T>(); |
| bool seenFirst = false; |
| late T value; |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: result._completeError, onDone: () { |
| if (!seenFirst) { |
| try { |
| // Throw and recatch, instead of just doing |
| // _completeWithErrorCallback, e, theError, StackTrace.current), |
| // to ensure that the stackTrace is set on the error. |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(result, e, s); |
| } |
| } else { |
| result._complete(value); |
| } |
| }, cancelOnError: true); |
| subscription.onData((T element) { |
| if (seenFirst) { |
| _runUserCode(() => combine(value, element), (T newValue) { |
| value = newValue; |
| }, _cancelAndErrorClosure(subscription, result)); |
| } else { |
| value = element; |
| seenFirst = true; |
| } |
| }); |
| return result; |
| } |
| |
| /// Combines a sequence of values by repeatedly applying [combine]. |
| /// |
| /// Similar to [Iterable.fold], this function maintains a value, |
| /// starting with [initialValue] and updated for each element of |
| /// this stream. |
| /// For each element, the value is updated to the result of calling |
| /// [combine] with the previous value and the element. |
| /// |
| /// When this stream is done, the returned future is completed with |
| /// the value at that time. |
| /// For an empty stream, the future is completed with [initialValue]. |
| /// |
| /// If this stream emits an error, or the call to [combine] throws, |
| /// the returned future is completed with that error, |
| /// and processing is stopped. |
| Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
| _Future<S> result = new _Future<S>(); |
| S value = initialValue; |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: result._completeError, onDone: () { |
| result._complete(value); |
| }, cancelOnError: true); |
| subscription.onData((T element) { |
| _runUserCode(() => combine(value, element), (S newValue) { |
| value = newValue; |
| }, _cancelAndErrorClosure(subscription, result)); |
| }); |
| return result; |
| } |
| |
| /// Combines the string representation of elements into a single string. |
| /// |
| /// Each element is converted to a string using its [Object.toString] method. |
| /// If [separator] is provided, it is inserted between element string |
| /// representations. |
| /// |
| /// The returned future is completed with the combined string when this stream |
| /// is done. |
| /// |
| /// If this stream emits an error, or the call to [Object.toString] throws, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| Future<String> join([String separator = ""]) { |
| _Future<String> result = new _Future<String>(); |
| StringBuffer buffer = new StringBuffer(); |
| bool first = true; |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: result._completeError, onDone: () { |
| result._complete(buffer.toString()); |
| }, cancelOnError: true); |
| subscription.onData(separator.isEmpty |
| ? (T element) { |
| try { |
| buffer.write(element); |
| } catch (e, s) { |
| _cancelAndErrorWithReplacement(subscription, result, e, s); |
| } |
| } |
| : (T element) { |
| if (!first) { |
| buffer.write(separator); |
| } |
| first = false; |
| try { |
| buffer.write(element); |
| } catch (e, s) { |
| _cancelAndErrorWithReplacement(subscription, result, e, s); |
| } |
| }); |
| return result; |
| } |
| |
| /// Returns whether [needle] occurs in the elements provided by this stream. |
| /// |
| /// Compares each element of this stream to [needle] using [Object.==]. |
| /// If an equal element is found, the returned future is completed with `true`. |
| /// If this stream ends without finding a match, the future is completed with |
| /// `false`. |
| /// |
| /// If this stream emits an error, or the call to [Object.==] throws, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| Future<bool> contains(Object? needle) { |
| _Future<bool> future = new _Future<bool>(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| future._complete(false); |
| }, cancelOnError: true); |
| subscription.onData((T element) { |
| _runUserCode(() => (element == needle), (bool isMatch) { |
| if (isMatch) { |
| _cancelAndValue(subscription, future, true); |
| } |
| }, _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// Executes [action] on each element of this stream. |
| /// |
| /// Completes the returned [Future] when all elements of this stream |
| /// have been processed. |
| /// |
| /// If this stream emits an error, or if the call to [action] throws, |
| /// the returned future completes with that error, |
| /// and processing stops. |
| Future forEach(void action(T element)) { |
| _Future future = new _Future(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| future._complete(null); |
| }, cancelOnError: true); |
| subscription.onData((T element) { |
| _runUserCode<void>(() => action(element), (_) {}, |
| _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// Checks whether [test] accepts all elements provided by this stream. |
| /// |
| /// Calls [test] on each element of this stream. |
| /// If the call returns `false`, the returned future is completed with `false` |
| /// and processing stops. |
| /// |
| /// If this stream ends without finding an element that [test] rejects, |
| /// the returned future is completed with `true`. |
| /// |
| /// If this stream emits an error, or if the call to [test] throws, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| Future<bool> every(bool test(T element)) { |
| _Future<bool> future = new _Future<bool>(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| future._complete(true); |
| }, cancelOnError: true); |
| subscription.onData((T element) { |
| _runUserCode(() => test(element), (bool isMatch) { |
| if (!isMatch) { |
| _cancelAndValue(subscription, future, false); |
| } |
| }, _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// Checks whether [test] accepts any element provided by this stream. |
| /// |
| /// Calls [test] on each element of this stream. |
| /// If the call returns `true`, the returned future is completed with `true` |
| /// and processing stops. |
| /// |
| /// If this stream ends without finding an element that [test] accepts, |
| /// the returned future is completed with `false`. |
| /// |
| /// If this stream emits an error, or if the call to [test] throws, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| Future<bool> any(bool test(T element)) { |
| _Future<bool> future = new _Future<bool>(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| future._complete(false); |
| }, cancelOnError: true); |
| subscription.onData((T element) { |
| _runUserCode(() => test(element), (bool isMatch) { |
| if (isMatch) { |
| _cancelAndValue(subscription, future, true); |
| } |
| }, _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// The number of elements in this stream. |
| /// |
| /// Waits for all elements of this stream. When this stream ends, |
| /// the returned future is completed with the number of elements. |
| /// |
| /// If this stream emits an error, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| /// |
| /// This operation listens to this stream, and a non-broadcast stream cannot |
| /// be reused after finding its length. |
| Future<int> get length { |
| _Future<int> future = new _Future<int>(); |
| int count = 0; |
| this.listen( |
| (_) { |
| count++; |
| }, |
| onError: future._completeError, |
| onDone: () { |
| future._complete(count); |
| }, |
| cancelOnError: true); |
| return future; |
| } |
| |
| /// Whether this stream contains any elements. |
| /// |
| /// Waits for the first element of this stream, then completes the returned |
| /// future with `false`. |
| /// If this stream ends without emitting any elements, the returned future is |
| /// completed with `true`. |
| /// |
| /// If the first event is an error, the returned future is completed with that |
| /// error. |
| /// |
| /// This operation listens to this stream, and a non-broadcast stream cannot |
| /// be reused after checking whether it is empty. |
| Future<bool> get isEmpty { |
| _Future<bool> future = new _Future<bool>(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| future._complete(true); |
| }, cancelOnError: true); |
| subscription.onData((_) { |
| _cancelAndValue(subscription, future, false); |
| }); |
| return future; |
| } |
| |
| /// Adapt this stream to be a `Stream<R>`. |
| /// |
| /// This stream is wrapped as a `Stream<R>` which checks at run-time that |
| /// each data event emitted by this stream is also an instance of [R]. |
| Stream<R> cast<R>() => Stream.castFrom<T, R>(this); |
| |
| /// Collects all elements of this stream in a [List]. |
| /// |
| /// Creates a `List<T>` and adds all elements of this stream to the list |
| /// in the order they arrive. |
| /// When this stream ends, the returned future is completed with that list. |
| /// |
| /// If this stream emits an error, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| Future<List<T>> toList() { |
| List<T> result = <T>[]; |
| _Future<List<T>> future = new _Future<List<T>>(); |
| this.listen( |
| (T data) { |
| result.add(data); |
| }, |
| onError: future._completeError, |
| onDone: () { |
| future._complete(result); |
| }, |
| cancelOnError: true); |
| return future; |
| } |
| |
| /// Collects the data of this stream in a [Set]. |
| /// |
| /// Creates a `Set<T>` and adds all elements of this stream to the set. |
| /// in the order they arrive. |
| /// When this stream ends, the returned future is completed with that set. |
| /// |
| /// The returned set is the same type as created by `<T>{}`. |
| /// If another type of set is needed, either use [forEach] to add each |
| /// element to the set, or use |
| /// `toList().then((list) => new SomeOtherSet.from(list))` |
| /// to create the set. |
| /// |
| /// If this stream emits an error, |
| /// the returned future is completed with that error, |
| /// and processing stops. |
| Future<Set<T>> toSet() { |
| Set<T> result = new Set<T>(); |
| _Future<Set<T>> future = new _Future<Set<T>>(); |
| this.listen( |
| (T data) { |
| result.add(data); |
| }, |
| onError: future._completeError, |
| onDone: () { |
| future._complete(result); |
| }, |
| cancelOnError: true); |
| return future; |
| } |
| |
| /// Discards all data on this stream, but signals when it is done or an error |
| /// occurred. |
| /// |
| /// When subscribing using [drain], cancelOnError will be true. This means |
| /// that the future will complete with the first error on this stream and then |
| /// cancel the subscription. |
| /// |
| /// If this stream emits an error, the returned future is completed with |
| /// that error, and processing is stopped. |
| /// |
| /// In case of a `done` event the future completes with the given |
| /// [futureValue]. |
| /// |
| /// The [futureValue] must not be omitted if `null` is not assignable to [E]. |
| Future<E> drain<E>([E? futureValue]) { |
| if (futureValue == null) { |
| futureValue = futureValue as E; |
| } |
| return listen(null, cancelOnError: true).asFuture<E>(futureValue); |
| } |
| |
| /// Provides at most the first [count] data events of this stream. |
| /// |
| /// Returns a stream that emits the same events that this stream would |
| /// if listened to at the same time, |
| /// until either this stream ends or it has emitted [count] data events, |
| /// at which point the returned stream is done. |
| /// |
| /// If this stream produces fewer than [count] data events before it's done, |
| /// so will the returned stream. |
| /// |
| /// Starts listening to this stream when the returned stream is listened to |
| /// and stops listening when the first [count] data events have been received. |
| /// |
| /// This means that if this is a single-subscription (non-broadcast) streams |
| /// it cannot be reused after the returned stream has been listened to. |
| /// |
| /// If this is a broadcast stream, the returned stream is a broadcast stream. |
| /// In that case, the events are only counted from the time |
| /// the returned stream is listened to. |
| Stream<T> take(int count) { |
| return new _TakeStream<T>(this, count); |
| } |
| |
| /// Forwards data events while [test] is successful. |
| /// |
| /// Returns a stream that provides the same events as this stream |
| /// until [test] fails for a data event. |
| /// The returned stream is done when either this stream is done, |
| /// or when this stream first emits a data event that fails [test]. |
| /// |
| /// The `test` call is considered failing if it returns a non-`true` value |
| /// or if it throws. If the `test` call throws, the error is emitted as the |
| /// last event on the returned streams. |
| /// |
| /// Stops listening to this stream after the accepted elements. |
| /// |
| /// Internally the method cancels its subscription after these elements. This |
| /// means that single-subscription (non-broadcast) streams are closed and |
| /// cannot be reused after a call to this method. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// For a broadcast stream, the events are only tested from the time |
| /// the returned stream is listened to. |
| Stream<T> takeWhile(bool test(T element)) { |
| return new _TakeWhileStream<T>(this, test); |
| } |
| |
| /// Skips the first [count] data events from this stream. |
| /// |
| /// Returns a stream that emits the same events as this stream would |
| /// if listened to at the same time, except that the first [count] |
| /// data events are not emitted. |
| /// The returned stream is done when this stream is. |
| /// |
| /// If this stream emits fewer than [count] data events |
| /// before being done, the returned stream emits no data events. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// For a broadcast stream, the events are only counted from the time |
| /// the returned stream is listened to. |
| Stream<T> skip(int count) { |
| return new _SkipStream<T>(this, count); |
| } |
| |
| /// Skip data events from this stream while they are matched by [test]. |
| /// |
| /// Returns a stream that emits the same events as this stream, |
| /// except that data events are not emitted until a data event fails `test`. |
| /// The test fails when called with a data event |
| /// if it returns a non-`true` value or if the call to `test` throws. |
| /// If the call throws, the error is emitted as an error event |
| /// on the returned stream instead of the data event, |
| /// otherwise the event that made `test` return non-true is emitted as the |
| /// first data event. |
| /// |
| /// Error and done events are provided by the returned stream unmodified. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// For a broadcast stream, the events are only tested from the time |
| /// the returned stream is listened to. |
| Stream<T> skipWhile(bool test(T element)) { |
| return new _SkipWhileStream<T>(this, test); |
| } |
| |
| /// Skips data events if they are equal to the previous data event. |
| /// |
| /// The returned stream provides the same events as this stream, except |
| /// that it never provides two consecutive data events that are equal. |
| /// That is, errors are passed through to the returned stream, and |
| /// data events are passed through if they are distinct from the most |
| /// recently emitted data event. |
| /// |
| /// Equality is determined by the provided [equals] method. If that is |
| /// omitted, the '==' operator on the last provided data element is used. |
| /// |
| /// If [equals] throws, the data event is replaced by an error event |
| /// containing the thrown error. The behavior is equivalent to the |
| /// original stream emitting the error event, and it doesn't change |
| /// the what the most recently emitted data event is. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// If a broadcast stream is listened to more than once, each subscription |
| /// will individually perform the `equals` test. |
| Stream<T> distinct([bool equals(T previous, T next)?]) { |
| return new _DistinctStream<T>(this, equals); |
| } |
| |
| /// The first element of this stream. |
| /// |
| /// Stops listening to this stream after the first element has been received. |
| /// |
| /// Internally the method cancels its subscription after the first element. |
| /// This means that single-subscription (non-broadcast) streams are closed |
| /// and cannot be reused after a call to this getter. |
| /// |
| /// If an error event occurs before the first data event, the returned future |
| /// is completed with that error. |
| /// |
| /// If this stream is empty (a done event occurs before the first data event), |
| /// the returned future completes with an error. |
| /// |
| /// Except for the type of the error, this method is equivalent to |
| /// `this.elementAt(0)`. |
| Future<T> get first { |
| _Future<T> future = new _Future<T>(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| try { |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(future, e, s); |
| } |
| }, cancelOnError: true); |
| subscription.onData((T value) { |
| _cancelAndValue(subscription, future, value); |
| }); |
| return future; |
| } |
| |
| /// The last element of this stream. |
| /// |
| /// If this stream emits an error event, |
| /// the returned future is completed with that error |
| /// and processing stops. |
| /// |
| /// If this stream is empty (the done event is the first event), |
| /// the returned future completes with an error. |
| Future<T> get last { |
| _Future<T> future = new _Future<T>(); |
| late T result; |
| bool foundResult = false; |
| listen( |
| (T value) { |
| foundResult = true; |
| result = value; |
| }, |
| onError: future._completeError, |
| onDone: () { |
| if (foundResult) { |
| future._complete(result); |
| return; |
| } |
| try { |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(future, e, s); |
| } |
| }, |
| cancelOnError: true); |
| return future; |
| } |
| |
| /// The single element of this stream. |
| /// |
| /// If this stream emits an error event, |
| /// the returned future is completed with that error |
| /// and processing stops. |
| /// |
| /// If [this] is empty or has more than one element, |
| /// the returned future completes with an error. |
| Future<T> get single { |
| _Future<T> future = new _Future<T>(); |
| late T result; |
| bool foundResult = false; |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| if (foundResult) { |
| future._complete(result); |
| return; |
| } |
| try { |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(future, e, s); |
| } |
| }, cancelOnError: true); |
| subscription.onData((T value) { |
| if (foundResult) { |
| // This is the second element we get. |
| try { |
| throw IterableElementError.tooMany(); |
| } catch (e, s) { |
| _cancelAndErrorWithReplacement(subscription, future, e, s); |
| } |
| return; |
| } |
| foundResult = true; |
| result = value; |
| }); |
| return future; |
| } |
| |
| /// Finds the first element of this stream matching [test]. |
| /// |
| /// Returns a future that is completed with the first element of this stream |
| /// that [test] returns `true` for. |
| /// |
| /// If no such element is found before this stream is done, and a |
| /// [orElse] function is provided, the result of calling [orElse] |
| /// becomes the value of the future. If [orElse] throws, the returned |
| /// future is completed with that error. |
| /// |
| /// If this stream emits an error before the first matching element, |
| /// the returned future is completed with that error, and processing stops. |
| /// |
| /// Stops listening to this stream after the first matching element or error |
| /// has been received. |
| /// |
| /// Internally the method cancels its subscription after the first element that |
| /// matches the predicate. This means that single-subscription (non-broadcast) |
| /// streams are closed and cannot be reused after a call to this method. |
| /// |
| /// If an error occurs, or if this stream ends without finding a match and |
| /// with no [orElse] function provided, |
| /// the returned future is completed with an error. |
| Future<T> firstWhere(bool test(T element), {T orElse()?}) { |
| _Future<T> future = new _Future(); |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| if (orElse != null) { |
| _runUserCode(orElse, future._complete, future._completeError); |
| return; |
| } |
| try { |
| // Sets stackTrace on error. |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(future, e, s); |
| } |
| }, cancelOnError: true); |
| |
| subscription.onData((T value) { |
| _runUserCode(() => test(value), (bool isMatch) { |
| if (isMatch) { |
| _cancelAndValue(subscription, future, value); |
| } |
| }, _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// Finds the last element in this stream matching [test]. |
| /// |
| /// If this stream emits an error, the returned future is completed with that |
| /// error, and processing stops. |
| /// |
| /// Otherwise as [firstWhere], except that the last matching element is found |
| /// instead of the first. |
| /// That means that a non-error result cannot be provided before this stream |
| /// is done. |
| Future<T> lastWhere(bool test(T element), {T orElse()?}) { |
| _Future<T> future = new _Future(); |
| late T result; |
| bool foundResult = false; |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| if (foundResult) { |
| future._complete(result); |
| return; |
| } |
| if (orElse != null) { |
| _runUserCode(orElse, future._complete, future._completeError); |
| return; |
| } |
| try { |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(future, e, s); |
| } |
| }, cancelOnError: true); |
| |
| subscription.onData((T value) { |
| _runUserCode(() => test(value), (bool isMatch) { |
| if (isMatch) { |
| foundResult = true; |
| result = value; |
| } |
| }, _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// Finds the single element in this stream matching [test]. |
| /// |
| /// Like [lastWhere], except that it is an error if more than one |
| /// matching element occurs in this stream. |
| Future<T> singleWhere(bool test(T element), {T orElse()?}) { |
| _Future<T> future = new _Future<T>(); |
| late T result; |
| bool foundResult = false; |
| StreamSubscription<T> subscription = |
| this.listen(null, onError: future._completeError, onDone: () { |
| if (foundResult) { |
| future._complete(result); |
| return; |
| } |
| if (orElse != null) { |
| _runUserCode(orElse, future._complete, future._completeError); |
| return; |
| } |
| try { |
| throw IterableElementError.noElement(); |
| } catch (e, s) { |
| _completeWithErrorCallback(future, e, s); |
| } |
| }, cancelOnError: true); |
| |
| subscription.onData((T value) { |
| _runUserCode(() => test(value), (bool isMatch) { |
| if (isMatch) { |
| if (foundResult) { |
| try { |
| throw IterableElementError.tooMany(); |
| } catch (e, s) { |
| _cancelAndErrorWithReplacement(subscription, future, e, s); |
| } |
| return; |
| } |
| foundResult = true; |
| result = value; |
| } |
| }, _cancelAndErrorClosure(subscription, future)); |
| }); |
| return future; |
| } |
| |
| /// Returns the value of the [index]th data event of this stream. |
| /// |
| /// Stops listening to this stream after the [index]th data event has been |
| /// received. |
| /// |
| /// Internally the method cancels its subscription after these elements. This |
| /// means that single-subscription (non-broadcast) streams are closed and |
| /// cannot be reused after a call to this method. |
| /// |
| /// If an error event occurs before the value is found, the future completes |
| /// with this error. |
| /// |
| /// If a done event occurs before the value is found, the future completes |
| /// with a [RangeError]. |
| Future<T> elementAt(int index) { |
| RangeError.checkNotNegative(index, "index"); |
| _Future<T> result = new _Future<T>(); |
| int elementIndex = 0; |
| StreamSubscription<T> subscription; |
| subscription = |
| this.listen(null, onError: result._completeError, onDone: () { |
| result._completeError( |
| new RangeError.index(index, this, "index", null, elementIndex), |
| StackTrace.empty); |
| }, cancelOnError: true); |
| subscription.onData((T value) { |
| if (index == elementIndex) { |
| _cancelAndValue(subscription, result, value); |
| return; |
| } |
| elementIndex += 1; |
| }); |
| |
| return result; |
| } |
| |
| /// Creates a new stream with the same events as this stream. |
| /// |
| /// When someone is listening on the returned stream and more than |
| /// [timeLimit] passes without any event being emitted by this stream, |
| /// the [onTimeout] function is called, which can then emit further events on |
| /// the returned stream. |
| /// |
| /// The countdown starts when the returned stream is listened to, |
| /// and is restarted when an event from the this stream is emitted, |
| /// or when listening on the returned stream is paused and resumed. |
| /// The countdown is stopped when listening on the returned stream is |
| /// paused or cancelled. |
| /// No new countdown is started when a countdown completes |
| /// and the [onTimeout] function is called, even if events are emitted. |
| /// If the delay between events of this stream is multiple times |
| /// [timeLimit], at most one timeout will happen between events. |
| /// |
| /// The [onTimeout] function is called with one argument: an |
| /// [EventSink] that allows putting events into the returned stream. |
| /// This `EventSink` is only valid during the call to [onTimeout]. |
| /// Calling [EventSink.close] on the sink passed to [onTimeout] closes the |
| /// returned stream, and no further events are processed. |
| /// |
| /// If [onTimeout] is omitted, a timeout will emit a [TimeoutException] |
| /// into the error channel of the returned stream. |
| /// If the call to [onTimeout] throws, the error is emitted as an error |
| /// on the returned stream. |
| /// |
| /// The returned stream is a broadcast stream if this stream is. |
| /// If a broadcast stream is listened to more than once, each subscription |
| /// will have its individually timer that starts counting on listen, |
| /// and the subscriptions' timers can be paused individually. |
| Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) { |
| _StreamControllerBase<T> controller; |
| if (isBroadcast) { |
| controller = new _SyncBroadcastStreamController<T>(null, null); |
| } else { |
| controller = new _SyncStreamController<T>(null, null, null, null); |
| } |
| |
| Zone zone = Zone.current; |
| // Register callback immediately. |
| _TimerCallback timeoutCallback; |
| if (onTimeout == null) { |
| timeoutCallback = () { |
| controller.addError( |
| new TimeoutException("No stream event", timeLimit), null); |
| }; |
| } else { |
| var registeredOnTimeout = |
| zone.registerUnaryCallback<void, EventSink<T>>(onTimeout); |
| var wrapper = new _ControllerEventSinkWrapper<T>(null); |
| timeoutCallback = () { |
| wrapper._sink = controller; // Only valid during call. |
| zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
| wrapper._sink = null; |
| }; |
| } |
| |
| // All further setup happens inside `onListen`. |
| controller.onListen = () { |
| Timer timer = zone.createTimer(timeLimit, timeoutCallback); |
| var subscription = this.listen(null); |
| // Set up event forwarding. Each data or error event resets the timer |
| subscription |
| ..onData((T event) { |
| timer.cancel(); |
| timer = zone.createTimer(timeLimit, timeoutCallback); |
| // Controller is synchronous, and the call might close the stream |
| // and cancel the timer, |
| // so create the Timer before calling into add(); |
| // issue: https://github.com/dart-lang/sdk/issues/37565 |
| controller.add(event); |
| }) |
| ..onError((Object error, StackTrace stackTrace) { |
| timer.cancel(); |
| timer = zone.createTimer(timeLimit, timeoutCallback); |
| controller._addError( |
| error, stackTrace); // Avoid Zone error replacement. |
| }) |
| ..onDone(() { |
| timer.cancel(); |
| controller.close(); |
| }); |
| // Set up further controller callbacks. |
| controller.onCancel = () { |
| timer.cancel(); |
| return subscription.cancel(); |
| }; |
| if (!isBroadcast) { |
| controller |
| ..onPause = () { |
| timer.cancel(); |
| subscription.pause(); |
| } |
| ..onResume = () { |
| subscription.resume(); |
| timer = zone.createTimer(timeLimit, timeoutCallback); |
| }; |
| } |
| }; |
| |
| return controller.stream; |
| } |
| } |
| |
| /// A subscription on events from a [Stream]. |
| /// |
| /// When you listen on a [Stream] using [Stream.listen], |
| /// a [StreamSubscription] object is returned. |
| /// |
| /// The subscription provides events to the listener, |
| /// and holds the callbacks used to handle the events. |
| /// The subscription can also be used to unsubscribe from the events, |
| /// or to temporarily pause the events from the stream. |
| abstract class StreamSubscription<T> { |
| /// Cancels this subscription. |
| /// |
| /// After this call, the subscription no longer receives events. |
| /// |
| /// The stream may need to shut down the source of events and clean up after |
| /// the subscription is canceled. |
| /// |
| /// Returns a future that is completed once the stream has finished |
| /// its cleanup. |
| /// |
| /// Typically, cleanup happens when the stream needs to release resources. |
| /// For example, a stream might need to close an open file (as an asynchronous |
| /// operation). If the listener wants to delete the file after having |
| /// canceled the subscription, it must wait for the cleanup future to complete. |
| /// |
| /// If the cleanup throws, which it really shouldn't, the returned future |
| /// completes with that error. |
| Future<void> cancel(); |
| |
| /// Replaces the data event handler of this subscription. |
| /// |
| /// The [handleData] function is called for each data event of the stream |
| /// after this function is called. |
| /// If [handleData] is `null`, data events are ignored. |
| /// |
| /// This method replaces the current handler set by the invocation of |
| /// [Stream.listen] or by a previous call to [onData]. |
| void onData(void handleData(T data)?); |
| |
| /// Replaces the error event handler of this subscription. |
| /// |
| /// The [handleError] function must be able to be called with either |
| /// one positional argument, or with two positional arguments |
| /// where the seconds is always a [StackTrace]. |
| /// |
| /// The [handleError] argument may be `null`, in which case further |
| /// error events are considered *unhandled*, and will be reported to |
| /// [Zone.handleUncaughtError]. |
| /// |
| /// The provided function is called for all error events from the |
| /// stream subscription. |
| /// |
| /// This method replaces the current handler set by the invocation of |
| /// [Stream.listen], by calling [asFuture], or by a previous call to [onError]. |
| void onError(Function? handleError); |
| |
| /// Replaces the done event handler of this subscription. |
| /// |
| /// The [handleDone] function is called when the stream closes. |
| /// The value may be `null`, in which case no function is called. |
| /// |
| /// This method replaces the current handler set by the invocation of |
| /// [Stream.listen], by calling [asFuture], or by a previous call to [onDone]. |
| void onDone(void handleDone()?); |
| |
| /// Requests that the stream pauses events until further notice. |
| /// |
| /// While paused, the subscription will not fire any events. |
| /// If it receives events from its source, they will be buffered until |
| /// the subscription is resumed. |
| /// For non-broadcast streams, the underlying source is usually informed |
| /// about the pause, |
| /// so it can stop generating events until the subscription is resumed. |
| /// |
| /// To avoid buffering events on a broadcast stream, it is better to |
| /// cancel this subscription, and start to listen again when events |
| /// are needed, if the intermediate events are not important. |
| /// |
| /// If [resumeSignal] is provided, the stream subscription will undo the pause |
| /// when the future completes, as if by a call to [resume]. |
| /// If the future completes with an error, |
| /// the stream will still resume, but the error will be considered unhandled |
| /// and is passed to [Zone.handleUncaughtError]. |
| /// |
| /// A call to [resume] will also undo a pause. |
| /// |
| /// If the subscription is paused more than once, an equal number |
| /// of resumes must be performed to resume the stream. |
| /// Calls to [resume] and the completion of a [resumeSignal] are |
| /// interchangeable - the [pause] which was passed a [resumeSignal] may be |
| /// ended by a call to [resume], and completing the [resumeSignal] may end a |
| /// different [pause]. |
| /// |
| /// It is safe to [resume] or complete a [resumeSignal] even when the |
| /// subscription is not paused, and the resume will have no effect. |
| void pause([Future<void>? resumeSignal]); |
| |
| /// Resumes after a pause. |
| /// |
| /// This undoes one previous call to [pause]. |
| /// When all previously calls to [pause] have been matched by a calls to |
| /// [resume], possibly through a `resumeSignal` passed to [pause], |
| /// the stream subscription may emit events again. |
| /// |
| /// It is safe to [resume] even when the subscription is not paused, and the |
| /// resume will have no effect. |
| void resume(); |
| |
| /// Whether the [StreamSubscription] is currently paused. |
| /// |
| /// If there have been more calls to [pause] than to [resume] on this |
| /// stream subscription, the subscription is paused, and this getter |
| /// returns `true`. |
| /// |
| /// Returns `false` if the stream can currently emit events, or if |
| /// the subscription has completed or been cancelled. |
| bool get isPaused; |
| |
| /// Returns a future that handles the [onDone] and [onError] callbacks. |
| /// |
| /// This method *overwrites* the existing [onDone] and [onError] callbacks |
| /// with new ones that complete the returned future. |
| /// |
| /// In case of an error the subscription will automatically cancel (even |
| /// when it was listening with `cancelOnError` set to `false`). |
| /// |
| /// In case of a `done` event the future completes with the given |
| /// [futureValue]. |
| /// |
| /// If [futureValue] is omitted, the value `null as E` is used as a default. |
| /// If `E` is not nullable, this will throw immediately when [asFuture] |
| /// is called. |
| Future<E> asFuture<E>([E? futureValue]); |
| } |
| |
| /// A [Sink] that supports adding errors. |
| /// |
| /// This makes it suitable for capturing the results of asynchronous |
| /// computations, which can complete with a value or an error. |
| /// |
| /// The [EventSink] has been designed to handle asynchronous events from |
| /// [Stream]s. See, for example, [Stream.eventTransformed] which uses |
| /// `EventSink`s to transform events. |
| abstract class EventSink<T> implements Sink<T> { |
| /// Adds a data [event] to the sink. |
| /// |
| /// Must not be called on a closed sink. |
| void add(T event); |
| |
| /// Adds an [error] to the sink. |
| /// |
| /// Must not be called on a closed sink. |
| void addError(Object error, [StackTrace? stackTrace]); |
| |
| /// Closes the sink. |
| /// |
| /// Calling this method more than once is allowed, but does nothing. |
| /// |
| /// Neither [add] nor [addError] must be called after this method. |
| void close(); |
| } |
| |
| /// [Stream] wrapper that only exposes the [Stream] interface. |
| class StreamView<T> extends Stream<T> { |
| final Stream<T> _stream; |
| |
| const StreamView(Stream<T> stream) |
| : _stream = stream, |
| super._internal(); |
| |
| bool get isBroadcast => _stream.isBroadcast; |
| |
| Stream<T> asBroadcastStream( |
| {void onListen(StreamSubscription<T> subscription)?, |
| void onCancel(StreamSubscription<T> subscription)?}) => |
| _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
| |
| StreamSubscription<T> listen(void onData(T value)?, |
| {Function? onError, void onDone()?, bool? cancelOnError}) { |
| return _stream.listen(onData, |
| onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| } |
| } |
| |
| /// Abstract interface for a "sink" accepting multiple entire streams. |
| /// |
| /// A consumer can accept a number of consecutive streams using [addStream], |
| /// and when no further data need to be added, the [close] method tells the |
| /// consumer to complete its work and shut down. |
| /// |
| /// The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream |
| /// to the consumer's [addStream] method. When that completes, it will |
| /// call [close] and then complete its own returned future. |
| abstract class StreamConsumer<S> { |
| /// Consumes the elements of [stream]. |
| /// |
| /// Listens on [stream] and does something for each event. |
| /// |
| /// Returns a future which is completed when the stream is done being added, |
| /// and the consumer is ready to accept a new stream. |
| /// No further calls to [addStream] or [close] should happen before the |
| /// returned future has completed. |
| /// |
| /// The consumer may stop listening to the stream after an error, |
| /// it may consume all the errors and only stop at a done event, |
| /// or it may be canceled early if the receiver don't want any further events. |
| /// |
| /// If the consumer stops listening because of some error preventing it |
| /// from continuing, it may report this error in the returned future, |
| /// otherwise it will just complete the future with `null`. |
| Future addStream(Stream<S> stream); |
| |
| /// Tells the consumer that no further streams will be added. |
| /// |
| /// This allows the consumer to complete any remaining work and release |
| /// resources that are no longer needed |
| /// |
| /// Returns a future which is completed when the consumer has shut down. |
| /// If cleaning up can fail, the error may be reported in the returned future, |
| /// otherwise it completes with `null`. |
| Future close(); |
| } |
| |
| /// A object that accepts stream events both synchronously and asynchronously. |
| /// |
| /// A [StreamSink] combines the methods from [StreamConsumer] and [EventSink]. |
| /// |
| /// The [EventSink] methods can't be used while the [addStream] is called. |
| /// As soon as the [addStream]'s [Future] completes with a value, the |
| /// [EventSink] methods can be used again. |
| /// |
| /// If [addStream] is called after any of the [EventSink] methods, it'll |
| /// be delayed until the underlying system has consumed the data added by the |
| /// [EventSink] methods. |
| /// |
| /// When [EventSink] methods are used, the [done] [Future] can be used to |
| /// catch any errors. |
| /// |
| /// When [close] is called, it will return the [done] [Future]. |
| abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { |
| /// Tells the stream sink that no further streams will be added. |
| /// |
| /// This allows the stream sink to complete any remaining work and release |
| /// resources that are no longer needed |
| /// |
| /// Returns a future which is completed when the stream sink has shut down. |
| /// If cleaning up can fail, the error may be reported in the returned future, |
| /// otherwise it completes with `null`. |
| /// |
| /// Returns the same future as [done]. |
| /// |
| /// The stream sink may close before the [close] method is called, either due |
| /// to an error or because it is itself providing events to someone who has |
| /// stopped listening. In that case, the [done] future is completed first, |
| /// and the `close` method will return the `done` future when called. |
| /// |
| /// Unifies [StreamConsumer.close] and [EventSink.close] which both mark their |
| /// object as not expecting any further events. |
| Future close(); |
| |
| /// Return a future which is completed when the [StreamSink] is finished. |
| /// |
| /// If the `StreamSink` fails with an error, |
| /// perhaps in response to adding events using [add], [addError] or [close], |
| /// the [done] future will complete with that error. |
| /// |
| /// Otherwise, the returned future will complete when either: |
| /// |
| /// * all events have been processed and the sink has been closed, or |
| /// * the sink has otherwise been stopped from handling more events |
| /// (for example by canceling a stream subscription). |
| Future get done; |
| } |
| |
| /// Transforms a Stream. |
| /// |
| /// When a stream's [Stream.transform] method is invoked with a |
| /// [StreamTransformer], the stream calls the [bind] method on the provided |
| /// transformer. The resulting stream is then returned from the |
| /// [Stream.transform] method. |
| /// |
| /// Conceptually, a transformer is simply a function from [Stream] to [Stream] |
| /// that is encapsulated into a class. |
| /// |
| /// It is good practice to write transformers that can be used multiple times. |
| /// |
| /// All other transforming methods on [Stream], such as [Stream.map], |
| /// [Stream.where] or [Stream.expand] can be implemented using |
| /// [Stream.transform]. A [StreamTransformer] is thus very powerful but often |
| /// also a bit more complicated to use. |
| abstract class StreamTransformer<S, T> { |
| /// Creates a [StreamTransformer] based on the given [onListen] callback. |
| /// |
| /// The returned stream transformer uses the provided [onListen] callback |
| /// when a transformed stream is listened to. At that time, the callback |
| /// receives the input stream (the one passed to [bind]) and a |
| /// boolean flag `cancelOnError` to create a [StreamSubscription]. |
| /// |
| /// If the transformed stream is a broadcast stream, so is the stream |
| /// returned by the [StreamTransformer.bind] method by this transformer. |
| /// |
| /// If the transformed stream is listened to multiple times, the [onListen] |
| /// callback is called again for each new [Stream.listen] call. |
| /// This happens whether the stream is a broadcast stream or not, |
| /// but the call will usually fail for non-broadcast streams. |
| /// |
| /// The [onListen] callback does *not* receive the handlers that were passed |
| /// to [Stream.listen]. These are automatically set after the call to the |
| /// [onListen] callback (using [StreamSubscription.onData], |
| /// [StreamSubscription.onError] and [StreamSubscription.onDone]). |
| /// |
| /// Most commonly, an [onListen] callback will first call [Stream.listen] on |
| /// the provided stream (with the corresponding `cancelOnError` flag), and then |
| /// return a new [StreamSubscription]. |
| /// |
| /// There are two common ways to create a StreamSubscription: |
| /// |
| /// 1. by allocating a [StreamController] and to return the result of |
| /// listening to its stream. It's important to forward pause, resume and |
| /// cancel events (unless the transformer intentionally wants to change |
| /// this behavior). |
| /// 2. by creating a new class that implements [StreamSubscription]. |
| /// Note that the subscription should run callbacks in the [Zone] the |
| /// stream was listened to (see [Zone] and [Zone.bindCallback]). |
| /// |
| /// Example: |
| /// |
| /// ```dart |
| /// /// Starts listening to [input] and duplicates all non-error events. |
| /// StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) { |
| /// // Create the result controller. |
| /// // Using `sync` is correct here, since only async events are forwarded. |
| /// var controller = StreamController<int>(sync: true); |
| /// controller.onListen = () { |
| /// var subscription = input.listen((data) { |
| /// // Duplicate the data. |
| /// controller.add(data); |
| /// controller.add(data); |
| /// }, |
| /// onError: controller.addError, |
| /// onDone: controller.close, |
| /// cancelOnError: cancelOnError); |
| /// // Controller forwards pause, resume and cancel events. |
| /// controller |
| /// ..onPause = subscription.pause |
| /// ..onResume = subscription.resume |
| /// ..onCancel = subscription.cancel; |
| /// }; |
| /// // Return a new [StreamSubscription] by listening to the controller's |
| /// // stream. |
| /// return controller.stream.listen(null); |
| /// } |
| /// |
| /// // Instantiate a transformer: |
| /// var duplicator = const StreamTransformer<int, int>(_onListen); |
| /// |
| /// // Use as follows: |
| /// intStream.transform(duplicator); |
| /// ``` |
| const factory StreamTransformer( |
| StreamSubscription<T> onListen( |
| Stream<S> stream, bool cancelOnError)) = |
| _StreamSubscriptionTransformer<S, T>; |
| |
| /// Creates a [StreamTransformer] that delegates events to the given functions. |
| /// |
| /// Example use of a duplicating transformer: |
| /// |
| /// ```dart |
| /// stringStream.transform(StreamTransformer<String, String>.fromHandlers( |
| /// handleData: (String value, EventSink<String> sink) { |
| /// sink.add(value); |
| /// sink.add(value); // Duplicate the incoming events. |
| /// })); |
| /// ``` |
| /// |
| /// Transformers that are constructed this way cannot use captured state if |
| /// they are used in streams that can be listened to multiple times. |
| /// |
| /// ```dart |
| /// StreamController<String> controller = StreamController.broadcast(); |
| /// controller.onListen = () { |
| /// scheduleMicrotask(() { |
| /// controller.addError("Bad"); |
| /// controller.addError("Worse"); |
| /// controller.addError("Worst"); |
| /// }); |
| /// }; |
| /// var sharedState = 0; |
| /// var transformedStream = controller.stream.transform( |
| /// StreamTransformer<String>.fromHandlers( |
| /// handleError: (error, stackTrace, sink) { |
| /// sharedState++; // Increment shared error-counter. |
| /// sink.add("Error $sharedState: $error"); |
| /// })); |
| /// |
| /// transformedStream.listen(print); |
| /// transformedStream.listen(print); // Listen twice. |
| /// // Listening twice to the same stream makes the transformer share the same |
| /// // state. Instead of having "Error 1: Bad", "Error 2: Worse", |
| /// // "Error 3: Worst" as output (each twice for the separate subscriptions), |
| /// // this program emits: |
| /// // Error 1: Bad |
| /// // Error 2: Bad |
| /// // Error 3: Worse |
| /// // Error 4: Worse |
| /// // Error 5: Worst |
| /// // Error 6: Worst |
| /// ``` |
| factory StreamTransformer.fromHandlers( |
| {void handleData(S data, EventSink<T> sink)?, |
| void handleError(Object error, StackTrace stackTrace, EventSink<T> sink)?, |
| void handleDone(EventSink<T> sink)?}) = _StreamHandlerTransformer<S, T>; |
| |
| /// Creates a [StreamTransformer] based on a [bind] callback. |
| /// |
| /// The returned stream transformer uses the [bind] argument to implement the |
| /// [StreamTransformer.bind] API and can be used when the transformation is |
| /// available as a stream-to-stream function. |
| /// |
| /// ```dart import:convert |
| /// final splitDecoded = StreamTransformer<List<int>, String>.fromBind( |
| /// (stream) => stream.transform(utf8.decoder).transform(LineSplitter())); |
| /// ``` |
| @Since("2.1") |
| factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind) = |
| _StreamBindTransformer<S, T>; |
| |
| /// Adapts [source] to be a `StreamTransformer<TS, TT>`. |
| /// |
| /// This allows [source] to be used at the new type, but at run-time it |
| /// must satisfy the requirements of both the new type and its original type. |
| /// |
| /// Data events passed into the returned transformer must also be instances |
| /// of [SS], and data events produced by [source] for those events must |
| /// also be instances of [TT]. |
| static StreamTransformer<TS, TT> castFrom<SS, ST, TS, TT>( |
| StreamTransformer<SS, ST> source) { |
| return new CastStreamTransformer<SS, ST, TS, TT>(source); |
| } |
| |
| /// Transforms the provided [stream]. |
| /// |
| /// Returns a new stream with events that are computed from events of the |
| /// provided [stream]. |
| /// |
| /// The [StreamTransformer] interface is completely generic, |
| /// so it cannot say what subclasses do. |
| /// Each [StreamTransformer] should document clearly how it transforms the |
| /// stream (on the class or variable used to access the transformer), |
| /// as well as any differences from the following typical behavior: |
| /// |
| /// * When the returned stream is listened to, it starts listening to the |
| /// input [stream]. |
| /// * Subscriptions of the returned stream forward (in a reasonable time) |
| /// a [StreamSubscription.pause] call to the subscription of the input |
| /// [stream]. |
| /// * Similarly, canceling a subscription of the returned stream eventually |
| /// (in reasonable time) cancels the subscription of the input [stream]. |
| /// |
| /// "Reasonable time" depends on the transformer and stream. Some transformers, |
| /// like a "timeout" transformer, might make these operations depend on a |
| /// duration. Others might not delay them at all, or just by a microtask. |
| /// |
| /// Transformers are free to handle errors in any way. |
| /// A transformer implementation may choose to propagate errors, |
| /// or convert them to other events, or ignore them completely, |
| /// but if errors are ignored, it should be documented explicitly. |
| Stream<T> bind(Stream<S> stream); |
| |
| /// Provides a `StreamTransformer<RS, RT>` view of this stream transformer. |
| /// |
| /// The resulting transformer will check at run-time that all data events |
| /// of the stream it transforms are actually instances of [S], |
| /// and it will check that all data events produced by this transformer |
| /// are actually instances of [RT]. |
| StreamTransformer<RS, RT> cast<RS, RT>(); |
| } |
| |
| /// Base class for implementing [StreamTransformer]. |
| /// |
| /// Contains default implementations of every method except [bind]. |
| abstract class StreamTransformerBase<S, T> implements StreamTransformer<S, T> { |
| const StreamTransformerBase(); |
| |
| StreamTransformer<RS, RT> cast<RS, RT>() => |
| StreamTransformer.castFrom<S, T, RS, RT>(this); |
| } |
| |
| /// An [Iterator]-like interface for the values of a [Stream]. |
| /// |
| /// This wraps a [Stream] and a subscription on the stream. It listens |
| /// on the stream, and completes the future returned by [moveNext] when the |
| /// next value becomes available. |
| /// |
| /// The stream may be paused between calls to [moveNext]. |
| /// |
| /// The [current] value must only be used after a future returned by [moveNext] |
| /// has completed with `true`, and only until [moveNext] is called again. |
| abstract class StreamIterator<T> { |
| /// Create a [StreamIterator] on [stream]. |
| factory StreamIterator(Stream<T> stream) => |
| // TODO(lrn): use redirecting factory constructor when type |
| // arguments are supported. |
| new _StreamIterator<T>(stream); |
| |
| /// Wait for the next stream value to be available. |
| /// |
| /// Returns a future which will complete with either `true` or `false`. |
| /// Completing with `true` means that another event has been received and |
| /// can be read as [current]. |
| /// Completing with `false` means that the stream iteration is done and |
| /// no further events will ever be available. |
| /// The future may complete with an error, if the stream produces an error, |
| /// which also ends iteration. |
| /// |
| /// The function must not be called again until the future returned by a |
| /// previous call is completed. |
| Future<bool> moveNext(); |
| |
| /// The current value of the stream. |
| /// |
| /// When a [moveNext] call completes with `true`, the [current] field holds |
| /// the most recent event of the stream, and it stays like that until the next |
| /// call to [moveNext]. This value must only be read after a call to [moveNext] |
| /// has completed with `true`, and only until the [moveNext] is called again. |
| /// |
| /// If the StreamIterator has not yet been moved to the first element |
| /// ([moveNext] has not been called and completed yet), or if the |
| /// StreamIterator has been moved past the last element ([moveNext] has |
| /// returned `false`), then [current] is unspecified. A [StreamIterator] may |
| /// either throw or return an iterator-specific default value in that case. |
| T get current; |
| |
| /// Cancels the stream iterator (and the underlying stream subscription) early. |
| /// |
| /// The stream iterator is automatically canceled if the [moveNext] future |
| /// completes with either `false` or an error. |
| /// |
| /// If you need to stop listening for values before the stream iterator is |
| /// automatically closed, you must call [cancel] to ensure that the stream |
| /// is properly closed. |
| /// |
| /// If [moveNext] has been called when the iterator is canceled, |
| /// its returned future will complete with `false` as value, |
| /// as will all further calls to [moveNext]. |
| /// |
| /// Returns a future which completes when the cancellation is complete. |
| /// This can be an already completed future if the cancellation happens |
| /// synchronously. |
| Future cancel(); |
| } |
| |
| /// Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
| class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| EventSink? _sink; |
| _ControllerEventSinkWrapper(this._sink); |
| |
| EventSink _ensureSink() { |
| var sink = _sink; |
| if (sink == null) throw StateError("Sink not available"); |
| return sink; |
| } |
| |
| void add(T data) { |
| _ensureSink().add(data); |
| } |
| |
| void addError(error, [StackTrace? stackTrace]) { |
| _ensureSink().addError(error, stackTrace); |
| } |
| |
| void close() { |
| _ensureSink().close(); |
| } |
| } |
| |
| /// An enhanced stream controller provided by [Stream.multi]. |
| /// |
| /// Acts like a normal asynchronous controller, but also allows |
| /// adding events synchronously. |
| /// As with any synchronous event delivery, the sender should be very careful |
| /// to not deliver events at times when a new listener might not |
| /// be ready to receive them. |
| /// That generally means only delivering events synchronously in response to other |
| /// asynchronous events, because that is a time when an asynchronous event could |
| /// happen. |
| @Since("2.9") |
| abstract class MultiStreamController<T> implements StreamController<T> { |
| /// Adds and delivers an event. |
| /// |
| /// Adds an event like [add] and attempts to deliver it immediately. |
| /// Delivery can be delayed if other previously added events are |
| /// still pending delivery, if the subscription is paused, |
| /// or if the subscription isn't listening yet. |
| void addSync(T value); |
| |
| /// Adds and delivers an error event. |
| /// |
| /// Adds an error like [addError] and attempts to deliver it immediately. |
| /// Delivery can be delayed if other previously added events are |
| /// still pending delivery, if the subscription is paused, |
| /// or if the subscription isn't listening yet. |
| void addErrorSync(Object error, [StackTrace? stackTrace]); |
| |
| /// Closes the controller and delivers a done event. |
| /// |
| /// Closes the controller like [close] and attempts to deliver a "done" |
| /// event immediately. |
| /// Delivery can be delayed if other previously added events are |
| /// still pending delivery, if the subscription is paused, |
| /// or if the subscription isn't listening yet. |
| /// If it's necessary to know whether the "done" event has been delivered, |
| /// [done] future will complete when that has happened. |
| void closeSync(); |
| } |