|  | // Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file | 
|  | // for details. All rights reserved. Use of this source code is governed by a | 
|  | // BSD-style license that can be found in the LICENSE file. | 
|  |  | 
|  | import 'dart:async'; | 
|  |  | 
|  | /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | 
|  | /// with one another. | 
|  | /// | 
|  | /// This allows APIs to expose multiple [Future]s and [Stream]s that have | 
|  | /// identical error conditions without forcing API consumers to attach error | 
|  | /// handling to objects they don't care about. | 
|  | /// | 
|  | /// To use an [ErrorGroup], register [Future]s and [Stream]s with it using | 
|  | /// [registerFuture] and [registerStream]. These methods return wrapped versions | 
|  | /// of the [Future]s and [Stream]s, which should then be used in place of the | 
|  | /// originals. For example: | 
|  | /// | 
|  | ///     var errorGroup = new ErrorGroup(); | 
|  | ///     future = errorGroup.registerFuture(future); | 
|  | ///     stream = errorGroup.registerStream(stream); | 
|  | /// | 
|  | /// An [ErrorGroup] has two major effects on its wrapped members: | 
|  | /// | 
|  | /// * An error in any member of the group will be propagated to every member | 
|  | ///   that hasn't already completed. If those members later complete, their | 
|  | ///   values will be ignored. | 
|  | /// * If any member of this group has a listener, errors on members without | 
|  | ///   listeners won't get passed to the top-level error handler. | 
|  | class ErrorGroup { | 
|  | /// The [Future]s that are members of [this]. | 
|  | final _futures = <_ErrorGroupFuture>[]; | 
|  |  | 
|  | /// The [Stream]s that are members of [this]. | 
|  | final _streams = <_ErrorGroupStream>[]; | 
|  |  | 
|  | /// Whether [this] has completed, either successfully or with an error. | 
|  | var _isDone = false; | 
|  |  | 
|  | /// The [Completer] for [done]. | 
|  | final _doneCompleter = Completer<void>(); | 
|  |  | 
|  | /// The underlying [Future] for [done]. | 
|  | /// | 
|  | /// We need to be able to access it internally as an [_ErrorGroupFuture] so | 
|  | /// we can check if it has listeners and signal errors on it. | 
|  | late _ErrorGroupFuture _done; | 
|  |  | 
|  | /// Returns a [Future] that completes successfully when all members of [this] | 
|  | /// are complete, or with an error if any member receives an error. | 
|  | /// | 
|  | /// This [Future] is effectively in the group in that an error on it won't be | 
|  | /// passed to the top-level error handler unless no members of the group have | 
|  | /// listeners attached. | 
|  | Future get done => _done; | 
|  |  | 
|  | /// Creates a new group with no members. | 
|  | ErrorGroup() { | 
|  | _done = _ErrorGroupFuture(this, _doneCompleter.future); | 
|  | } | 
|  |  | 
|  | /// Registers a [Future] as a member of [this]. | 
|  | /// | 
|  | /// Returns a wrapped version of [future] that should be used in its place. | 
|  | /// | 
|  | /// If all members of [this] have already completed successfully or with an | 
|  | /// error, it's a [StateError] to try to register a new [Future]. | 
|  | Future<T> registerFuture<T>(Future<T> future) { | 
|  | if (_isDone) { | 
|  | throw StateError("Can't register new members on a complete " | 
|  | 'ErrorGroup.'); | 
|  | } | 
|  |  | 
|  | var wrapped = _ErrorGroupFuture(this, future); | 
|  | _futures.add(wrapped); | 
|  | return wrapped; | 
|  | } | 
|  |  | 
|  | /// Registers a [Stream] as a member of [this]. | 
|  | /// | 
|  | /// Returns a wrapped version of [stream] that should be used in its place. | 
|  | /// The returned [Stream] will be multi-subscription if and only if [stream] | 
|  | /// is. | 
|  | /// | 
|  | /// Since all errors in a group are passed to all members, the returned | 
|  | /// [Stream] will automatically unsubscribe all its listeners when it | 
|  | /// encounters an error. | 
|  | /// | 
|  | /// If all members of [this] have already completed successfully or with an | 
|  | /// error, it's a [StateError] to try to register a new [Stream]. | 
|  | Stream<T> registerStream<T>(Stream<T> stream) { | 
|  | if (_isDone) { | 
|  | throw StateError("Can't register new members on a complete " | 
|  | 'ErrorGroup.'); | 
|  | } | 
|  |  | 
|  | var wrapped = _ErrorGroupStream(this, stream); | 
|  | _streams.add(wrapped); | 
|  | return wrapped; | 
|  | } | 
|  |  | 
|  | /// Sends [error] to all members of [this]. | 
|  | /// | 
|  | /// Like errors that come from members, this will only be passed to the | 
|  | /// top-level error handler if no members have listeners. | 
|  | /// | 
|  | /// If all members of [this] have already completed successfully or with an | 
|  | /// error, it's a [StateError] to try to signal an error. | 
|  | void signalError(var error, [StackTrace? stackTrace]) { | 
|  | if (_isDone) { | 
|  | throw StateError("Can't signal errors on a complete ErrorGroup."); | 
|  | } | 
|  |  | 
|  | _signalError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | /// Signal an error internally. | 
|  | /// | 
|  | /// This is just like [signalError], but instead of throwing an error if | 
|  | /// [this] is complete, it just does nothing. | 
|  | void _signalError(var error, [StackTrace? stackTrace]) { | 
|  | if (_isDone) return; | 
|  |  | 
|  | var caught = false; | 
|  | for (var future in _futures) { | 
|  | if (future._isDone || future._hasListeners) caught = true; | 
|  | future._signalError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | for (var stream in _streams) { | 
|  | if (stream._isDone || stream._hasListeners) caught = true; | 
|  | stream._signalError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | _isDone = true; | 
|  | _done._signalError(error, stackTrace); | 
|  | if (!caught && !_done._hasListeners) { | 
|  | scheduleMicrotask(() { | 
|  | throw error; | 
|  | }); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Notifies [this] that one of its member [Future]s is complete. | 
|  | void _signalFutureComplete(_ErrorGroupFuture future) { | 
|  | if (_isDone) return; | 
|  |  | 
|  | _isDone = _futures.every((future) => future._isDone) && | 
|  | _streams.every((stream) => stream._isDone); | 
|  | if (_isDone) _doneCompleter.complete(); | 
|  | } | 
|  |  | 
|  | /// Notifies [this] that one of its member [Stream]s is complete. | 
|  | void _signalStreamComplete(_ErrorGroupStream stream) { | 
|  | if (_isDone) return; | 
|  |  | 
|  | _isDone = _futures.every((future) => future._isDone) && | 
|  | _streams.every((stream) => stream._isDone); | 
|  | if (_isDone) _doneCompleter.complete(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A [Future] wrapper that keeps track of whether it's been completed and | 
|  | /// whether it has any listeners. | 
|  | /// | 
|  | /// It also notifies its parent [ErrorGroup] when it completes successfully or | 
|  | /// receives an error. | 
|  | class _ErrorGroupFuture<T> implements Future<T> { | 
|  | /// The parent [ErrorGroup]. | 
|  | final ErrorGroup _group; | 
|  |  | 
|  | /// Whether [this] has completed, either successfully or with an error. | 
|  | var _isDone = false; | 
|  |  | 
|  | /// The underlying [Completer] for [this]. | 
|  | final _completer = Completer<T>(); | 
|  |  | 
|  | /// Whether [this] has any listeners. | 
|  | bool _hasListeners = false; | 
|  |  | 
|  | /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 
|  | /// [inner]. | 
|  | _ErrorGroupFuture(this._group, Future<T> inner) { | 
|  | inner.then((value) { | 
|  | if (!_isDone) _completer.complete(value); | 
|  | _isDone = true; | 
|  | _group._signalFutureComplete(this); | 
|  | }).catchError((Object e, [StackTrace? s]) async { | 
|  | _group._signalError(e, s); | 
|  | }); | 
|  |  | 
|  | // Make sure _completer.future doesn't automatically send errors to the | 
|  | // top-level. | 
|  | Future<void> swallowErrors(Future future) async { | 
|  | try { | 
|  | await future; | 
|  | } catch (_) { | 
|  | // Do nothing. | 
|  | } | 
|  | } | 
|  |  | 
|  | swallowErrors(_completer.future); | 
|  | } | 
|  |  | 
|  | @override | 
|  | Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function? onError}) { | 
|  | _hasListeners = true; | 
|  | return _completer.future.then(onValue, onError: onError); | 
|  | } | 
|  |  | 
|  | @override | 
|  | Future<T> catchError(Function onError, {bool Function(Object error)? test}) { | 
|  | _hasListeners = true; | 
|  | return _completer.future.catchError(onError, test: test); | 
|  | } | 
|  |  | 
|  | @override | 
|  | Future<T> whenComplete(void Function() action) { | 
|  | _hasListeners = true; | 
|  | return _completer.future.whenComplete(action); | 
|  | } | 
|  |  | 
|  | @override | 
|  | Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) { | 
|  | _hasListeners = true; | 
|  | return _completer.future.timeout(timeLimit, onTimeout: onTimeout); | 
|  | } | 
|  |  | 
|  | @override | 
|  | Stream<T> asStream() { | 
|  | _hasListeners = true; | 
|  | return _completer.future.asStream(); | 
|  | } | 
|  |  | 
|  | /// Signal that an error from [_group] should be propagated through [this], | 
|  | /// unless it's already complete. | 
|  | void _signalError(var error, [StackTrace? stackTrace]) { | 
|  | if (!_isDone) _completer.completeError(error, stackTrace); | 
|  | _isDone = true; | 
|  | } | 
|  | } | 
|  |  | 
|  | // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). | 
|  | // When this is fixed, this class will need to prevent such errors from being | 
|  | // top-leveled. | 
|  | /// A [Stream] wrapper that keeps track of whether it's been completed and | 
|  | /// whether it has any listeners. | 
|  | /// | 
|  | /// It also notifies its parent [ErrorGroup] when it completes successfully or | 
|  | /// receives an error. | 
|  | class _ErrorGroupStream<T> extends Stream<T> { | 
|  | /// The parent [ErrorGroup]. | 
|  | final ErrorGroup _group; | 
|  |  | 
|  | /// Whether [this] has completed, either successfully or with an error. | 
|  | var _isDone = false; | 
|  |  | 
|  | /// The underlying [StreamController] for [this]. | 
|  | late final StreamController<T> _controller; | 
|  |  | 
|  | /// The controller's [Stream]. | 
|  | /// | 
|  | /// May be different than `_controller.stream` if the wrapped stream is a | 
|  | /// broadcasting stream. | 
|  | late Stream<T> _stream; | 
|  |  | 
|  | /// The [StreamSubscription] that connects the wrapped [Stream] to | 
|  | /// [_controller]. | 
|  | late StreamSubscription<T> _subscription; | 
|  |  | 
|  | /// Whether [this] has any listeners. | 
|  | bool get _hasListeners => _controller.hasListener; | 
|  |  | 
|  | /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 
|  | /// [inner]. | 
|  | _ErrorGroupStream(this._group, Stream<T> inner) | 
|  | : _controller = StreamController(sync: true) { | 
|  | // Use old-style asBroadcastStream behavior - cancel source _subscription | 
|  | // the first time the stream has no listeners. | 
|  | _stream = inner.isBroadcast | 
|  | ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) | 
|  | : _controller.stream; | 
|  | _subscription = | 
|  | inner.listen(_controller.add, onError: _group._signalError, onDone: () { | 
|  | _isDone = true; | 
|  | _group._signalStreamComplete(this); | 
|  | _controller.close(); | 
|  | }); | 
|  | } | 
|  |  | 
|  | @override | 
|  | StreamSubscription<T> listen(void Function(T)? onData, | 
|  | {Function? onError, void Function()? onDone, bool? cancelOnError}) { | 
|  | return _stream.listen(onData, | 
|  | onError: onError, onDone: onDone, cancelOnError: true); | 
|  | } | 
|  |  | 
|  | /// Signal that an error from [_group] should be propagated through [this], | 
|  | /// unless it's already complete. | 
|  | void _signalError(var e, [StackTrace? stackTrace]) { | 
|  | if (_isDone) return; | 
|  | _subscription.cancel(); | 
|  | // Call these asynchronously to work around issue 7913. | 
|  | Future.value().then((_) { | 
|  | _controller.addError(e, stackTrace); | 
|  | _controller.close(); | 
|  | }); | 
|  | } | 
|  | } |