blob: 0262085cf0b61243c61d2ca87b0c3d8c6b28a997 [file] [log] [blame]
library pub.error_group;
import 'dart:async';
class ErrorGroup {
final _futures = <_ErrorGroupFuture>[];
final _streams = <_ErrorGroupStream>[];
var _isDone = false;
final _doneCompleter = new Completer();
_ErrorGroupFuture _done;
Future get done => _done;
ErrorGroup() {
this._done = new _ErrorGroupFuture(this, _doneCompleter.future);
}
Future registerFuture(Future future) {
if (_isDone) {
throw new StateError(
"Can't register new members on a complete " "ErrorGroup.");
}
var wrapped = new _ErrorGroupFuture(this, future);
_futures.add(wrapped);
return wrapped;
}
Stream registerStream(Stream stream) {
if (_isDone) {
throw new StateError(
"Can't register new members on a complete " "ErrorGroup.");
}
var wrapped = new _ErrorGroupStream(this, stream);
_streams.add(wrapped);
return wrapped;
}
void signalError(var error, [StackTrace stackTrace]) {
if (_isDone) {
throw new StateError("Can't signal errors on a complete ErrorGroup.");
}
_signalError(error, stackTrace);
}
void _signalError(var error, [StackTrace stackTrace]) {
if (_isDone) return;
var caught = false;
for (var future in _futures) {
if (future._isDone || future._hasListeners) caught = true;
future._signalError(error, stackTrace);
}
for (var stream in _streams) {
if (stream._isDone || stream._hasListeners) caught = true;
stream._signalError(error, stackTrace);
}
_isDone = true;
_done._signalError(error, stackTrace);
if (!caught && !_done._hasListeners) scheduleMicrotask(() {
throw error;
});
}
void _signalFutureComplete(_ErrorGroupFuture future) {
if (_isDone) return;
_isDone = _futures.every((future) => future._isDone) &&
_streams.every((stream) => stream._isDone);
if (_isDone) _doneCompleter.complete();
}
void _signalStreamComplete(_ErrorGroupStream stream) {
if (_isDone) return;
_isDone = _futures.every((future) => future._isDone) &&
_streams.every((stream) => stream._isDone);
if (_isDone) _doneCompleter.complete();
}
}
class _ErrorGroupFuture implements Future {
final ErrorGroup _group;
var _isDone = false;
final _completer = new Completer();
bool _hasListeners = false;
_ErrorGroupFuture(this._group, Future inner) {
inner.then((value) {
if (!_isDone) _completer.complete(value);
_isDone = true;
_group._signalFutureComplete(this);
}).catchError(_group._signalError);
_completer.future.catchError((_) {});
}
Future then(onValue(value), {Function onError}) {
_hasListeners = true;
return _completer.future.then(onValue, onError: onError);
}
Future catchError(Function onError, {bool test(Object error)}) {
_hasListeners = true;
return _completer.future.catchError(onError, test: test);
}
Future whenComplete(void action()) {
_hasListeners = true;
return _completer.future.whenComplete(action);
}
Future timeout(Duration timeLimit, {void onTimeout()}) {
_hasListeners = true;
return _completer.future.timeout(timeLimit, onTimeout: onTimeout);
}
Stream asStream() {
_hasListeners = true;
return _completer.future.asStream();
}
void _signalError(var error, [StackTrace stackTrace]) {
if (!_isDone) _completer.completeError(error, stackTrace);
_isDone = true;
}
}
class _ErrorGroupStream extends Stream {
final ErrorGroup _group;
var _isDone = false;
final StreamController _controller;
Stream _stream;
StreamSubscription _subscription;
bool get _hasListeners => _controller.hasListener;
_ErrorGroupStream(this._group, Stream inner)
: _controller = new StreamController(sync: true) {
_stream = inner.isBroadcast ?
_controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) :
_controller.stream;
_subscription = inner.listen((v) {
_controller.add(v);
}, onError: (e, [stackTrace]) {
_group._signalError(e, stackTrace);
}, onDone: () {
_isDone = true;
_group._signalStreamComplete(this);
_controller.close();
});
}
StreamSubscription listen(void onData(value), {Function onError, void
onDone(), bool cancelOnError}) {
return _stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: true);
}
void _signalError(var e, [StackTrace stackTrace]) {
if (_isDone) return;
_subscription.cancel();
new Future.value().then((_) {
_controller.addError(e, stackTrace);
_controller.close();
});
}
}