blob: f264398b9a388432f10e6cb4557281523f820686 [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
/// with one another.
///
/// This allows APIs to expose multiple [Future]s and [Stream]s that have
/// identical error conditions without forcing API consumers to attach error
/// handling to objects they don't care about.
///
/// To use an [ErrorGroup], register [Future]s and [Stream]s with it using
/// [registerFuture] and [registerStream]. These methods return wrapped versions
/// of the [Future]s and [Stream]s, which should then be used in place of the
/// originals. For example:
///
/// var errorGroup = new ErrorGroup();
/// future = errorGroup.registerFuture(future);
/// stream = errorGroup.registerStream(stream);
///
/// An [ErrorGroup] has two major effects on its wrapped members:
///
/// * An error in any member of the group will be propagated to every member
/// that hasn't already completed. If those members later complete, their
/// values will be ignored.
/// * If any member of this group has a listener, errors on members without
/// listeners won't get passed to the top-level error handler.
class ErrorGroup {
/// The [Future]s that are members of [this].
final _futures = <_ErrorGroupFuture>[];
/// The [Stream]s that are members of [this].
final _streams = <_ErrorGroupStream>[];
/// Whether [this] has completed, either successfully or with an error.
var _isDone = false;
/// The [Completer] for [done].
final _doneCompleter = Completer<void>();
/// The underlying [Future] for [done].
///
/// We need to be able to access it internally as an [_ErrorGroupFuture] so
/// we can check if it has listeners and signal errors on it.
_ErrorGroupFuture _done;
/// Returns a [Future] that completes successfully when all members of [this]
/// are complete, or with an error if any member receives an error.
///
/// This [Future] is effectively in the group in that an error on it won't be
/// passed to the top-level error handler unless no members of the group have
/// listeners attached.
Future get done => _done;
/// Creates a new group with no members.
ErrorGroup() {
_done = _ErrorGroupFuture(this, _doneCompleter.future);
}
/// Registers a [Future] as a member of [this].
///
/// Returns a wrapped version of [future] that should be used in its place.
///
/// If all members of [this] have already completed successfully or with an
/// error, it's a [StateError] to try to register a new [Future].
Future<T> registerFuture<T>(Future<T> future) {
if (_isDone) {
throw StateError("Can't register new members on a complete "
'ErrorGroup.');
}
var wrapped = _ErrorGroupFuture(this, future);
_futures.add(wrapped);
return wrapped;
}
/// Registers a [Stream] as a member of [this].
///
/// Returns a wrapped version of [stream] that should be used in its place.
/// The returned [Stream] will be multi-subscription if and only if [stream]
/// is.
///
/// Since all errors in a group are passed to all members, the returned
/// [Stream] will automatically unsubscribe all its listeners when it
/// encounters an error.
///
/// If all members of [this] have already completed successfully or with an
/// error, it's a [StateError] to try to register a new [Stream].
Stream<T> registerStream<T>(Stream<T> stream) {
if (_isDone) {
throw StateError("Can't register new members on a complete "
'ErrorGroup.');
}
var wrapped = _ErrorGroupStream(this, stream);
_streams.add(wrapped);
return wrapped;
}
/// Sends [error] to all members of [this].
///
/// Like errors that come from members, this will only be passed to the
/// top-level error handler if no members have listeners.
///
/// If all members of [this] have already completed successfully or with an
/// error, it's a [StateError] to try to signal an error.
void signalError(var error, [StackTrace stackTrace]) {
if (_isDone) {
throw StateError("Can't signal errors on a complete ErrorGroup.");
}
_signalError(error, stackTrace);
}
/// Signal an error internally.
///
/// This is just like [signalError], but instead of throwing an error if
/// [this] is complete, it just does nothing.
void _signalError(var error, [StackTrace stackTrace]) {
if (_isDone) return;
var caught = false;
for (var future in _futures) {
if (future._isDone || future._hasListeners) caught = true;
future._signalError(error, stackTrace);
}
for (var stream in _streams) {
if (stream._isDone || stream._hasListeners) caught = true;
stream._signalError(error, stackTrace);
}
_isDone = true;
_done._signalError(error, stackTrace);
if (!caught && !_done._hasListeners) {
scheduleMicrotask(() {
throw error;
});
}
}
/// Notifies [this] that one of its member [Future]s is complete.
void _signalFutureComplete(_ErrorGroupFuture future) {
if (_isDone) return;
_isDone = _futures.every((future) => future._isDone) &&
_streams.every((stream) => stream._isDone);
if (_isDone) _doneCompleter.complete();
}
/// Notifies [this] that one of its member [Stream]s is complete.
void _signalStreamComplete(_ErrorGroupStream stream) {
if (_isDone) return;
_isDone = _futures.every((future) => future._isDone) &&
_streams.every((stream) => stream._isDone);
if (_isDone) _doneCompleter.complete();
}
}
/// A [Future] wrapper that keeps track of whether it's been completed and
/// whether it has any listeners.
///
/// It also notifies its parent [ErrorGroup] when it completes successfully or
/// receives an error.
class _ErrorGroupFuture<T> implements Future<T> {
/// The parent [ErrorGroup].
final ErrorGroup _group;
/// Whether [this] has completed, either successfully or with an error.
var _isDone = false;
/// The underlying [Completer] for [this].
final _completer = Completer<T>();
/// Whether [this] has any listeners.
bool _hasListeners = false;
/// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
/// [inner].
_ErrorGroupFuture(this._group, Future<T> inner) {
inner.then((value) {
if (!_isDone) _completer.complete(value);
_isDone = true;
_group._signalFutureComplete(this);
}).catchError(_group._signalError);
// Make sure _completer.future doesn't automatically send errors to the
// top-level.
_completer.future.catchError((_) {});
}
@override
Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) {
_hasListeners = true;
return _completer.future.then(onValue, onError: onError);
}
@override
Future<T> catchError(Function onError, {bool Function(Object error) test}) {
_hasListeners = true;
return _completer.future.catchError(onError, test: test);
}
@override
Future<T> whenComplete(void Function() action) {
_hasListeners = true;
return _completer.future.whenComplete(action);
}
@override
Future<T> timeout(Duration timeLimit, {void Function() onTimeout}) {
_hasListeners = true;
return _completer.future.timeout(timeLimit, onTimeout: onTimeout);
}
@override
Stream<T> asStream() {
_hasListeners = true;
return _completer.future.asStream();
}
/// Signal that an error from [_group] should be propagated through [this],
/// unless it's already complete.
void _signalError(var error, [StackTrace stackTrace]) {
if (!_isDone) _completer.completeError(error, stackTrace);
_isDone = true;
}
}
// TODO(nweiz): currently streams never top-level unhandled errors (issue 7843).
// When this is fixed, this class will need to prevent such errors from being
// top-leveled.
/// A [Stream] wrapper that keeps track of whether it's been completed and
/// whether it has any listeners.
///
/// It also notifies its parent [ErrorGroup] when it completes successfully or
/// receives an error.
class _ErrorGroupStream<T> extends Stream<T> {
/// The parent [ErrorGroup].
final ErrorGroup _group;
/// Whether [this] has completed, either successfully or with an error.
var _isDone = false;
/// The underlying [StreamController] for [this].
final StreamController<T> _controller;
/// The controller's [Stream].
///
/// May be different than `_controller.stream` if the wrapped stream is a
/// broadcasting stream.
Stream<T> _stream;
/// The [StreamSubscription] that connects the wrapped [Stream] to
/// [_controller].
StreamSubscription<T> _subscription;
/// Whether [this] has any listeners.
bool get _hasListeners => _controller.hasListener;
/// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
/// [inner].
_ErrorGroupStream(this._group, Stream<T> inner)
: _controller = StreamController(sync: true) {
// Use old-style asBroadcastStream behavior - cancel source _subscription
// the first time the stream has no listeners.
_stream = inner.isBroadcast
? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel())
: _controller.stream;
_subscription =
inner.listen(_controller.add, onError: _group._signalError, onDone: () {
_isDone = true;
_group._signalStreamComplete(this);
_controller.close();
});
}
@override
StreamSubscription<T> listen(void Function(T) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
return _stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: true);
}
/// Signal that an error from [_group] should be propagated through [this],
/// unless it's already complete.
void _signalError(var e, [StackTrace stackTrace]) {
if (_isDone) return;
_subscription.cancel();
// Call these asynchronously to work around issue 7913.
Future.value().then((_) {
_controller.addError(e, stackTrace);
_controller.close();
});
}
}