Return a future from Disconnector.disconnect(). (#4)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6e3a6ac..a018418 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.6.0
+
+* `Disconnector.disconnect()` now returns a future that completes when all the
+ inner `StreamSink.close()` futures have completed.
+
## 1.5.0
* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee
diff --git a/lib/src/disconnector.dart b/lib/src/disconnector.dart
index beff71d..b23813e 100644
--- a/lib/src/disconnector.dart
+++ b/lib/src/disconnector.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:async/async.dart';
+
import '../stream_channel.dart';
/// Allows the caller to force a channel to disconnect.
@@ -17,8 +19,7 @@
/// be disconnected immediately.
class Disconnector<T> implements StreamChannelTransformer<T, T> {
/// Whether [disconnect] has been called.
- bool get isDisconnected => _isDisconnected;
- var _isDisconnected = false;
+ bool get isDisconnected => _disconnectMemo.hasRun;
/// The sinks for transformed channels.
///
@@ -28,20 +29,25 @@
final _sinks = <_DisconnectorSink<T>>[];
/// Disconnects all channels that have been transformed.
- void disconnect() {
- _isDisconnected = true;
- for (var sink in _sinks) {
- sink._disconnect();
- }
+ ///
+ /// Returns a future that completes when all inner sinks' [StreamSink.close]
+ /// futures have completed. Note that a [StreamController]'s sink won't close
+ /// until the corresponding stream has a listener.
+ Future disconnect() => _disconnectMemo.runOnce(() {
+ var futures = _sinks.map((sink) => sink._disconnect()).toList();
_sinks.clear();
- }
+ return Future.wait(futures, eagerError: true);
+ });
+ final _disconnectMemo = new AsyncMemoizer();
StreamChannel<T> bind(StreamChannel<T> channel) {
return channel.changeSink((innerSink) {
var sink = new _DisconnectorSink<T>(innerSink);
- if (_isDisconnected) {
- sink._disconnect();
+ if (isDisconnected) {
+ // Ignore errors here, because otherwise there would be no way for the
+ // user to handle them gracefully.
+ sink._disconnect().catchError((_) {});
} else {
_sinks.add(sink);
}
@@ -126,14 +132,18 @@
/// Disconnects this sink.
///
- /// This closes the underlying sink and stops forwarding events.
- void _disconnect() {
+ /// This closes the underlying sink and stops forwarding events. It returns
+ /// the [StreamSink.close] future for the underlying sink.
+ Future _disconnect() {
_isDisconnected = true;
- _inner.close();
+ var future = _inner.close();
- if (!_inAddStream) return;
- _addStreamCompleter.complete(_addStreamSubscription.cancel());
- _addStreamCompleter = null;
- _addStreamSubscription = null;
+ if (_inAddStream) {
+ _addStreamCompleter.complete(_addStreamSubscription.cancel());
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ }
+
+ return future;
}
}
diff --git a/pubspec.yaml b/pubspec.yaml
index bcdfe8e..46d7761 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.5.1-dev
+version: 1.6.0
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/disconnector_test.dart b/test/disconnector_test.dart
index d7fa885..311a41c 100644
--- a/test/disconnector_test.dart
+++ b/test/disconnector_test.dart
@@ -4,6 +4,7 @@
import 'dart:async';
+import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
@@ -78,8 +79,35 @@
expect(canceled, isTrue);
});
+ test("disconnect() returns the close future from the inner sink", () async {
+ var streamController = new StreamController();
+ var sinkController = new StreamController();
+ var disconnector = new Disconnector();
+ var sink = new _CloseCompleterSink(sinkController.sink);
+ var channel = new StreamChannel.withGuarantees(
+ streamController.stream, sink)
+ .transform(disconnector);
+
+ var disconnectFutureFired = false;
+ expect(disconnector.disconnect().then((_) {
+ disconnectFutureFired = true;
+ }), completes);
+
+ // Give the future time to fire early if it's going to.
+ await pumpEventQueue();
+ expect(disconnectFutureFired, isFalse);
+
+ // When the inner sink's close future completes, so should the
+ // disconnector's.
+ sink.completer.complete();
+ await pumpEventQueue();
+ expect(disconnectFutureFired, isTrue);
+ });
+
group("after disconnection", () {
- setUp(() => disconnector.disconnect());
+ setUp(() {
+ disconnector.disconnect();
+ });
test("closes the inner sink and ignores events to the outer sink", () {
channel.sink.add(1);
@@ -108,3 +136,17 @@
});
});
}
+
+/// A [StreamSink] wrapper that adds the ability to manually complete the Future
+/// returned by [close] using [completer].
+class _CloseCompleterSink extends DelegatingStreamSink {
+ /// The completer for the future returned by [close].
+ final completer = new Completer();
+
+ _CloseCompleterSink(StreamSink inner) : super(inner);
+
+ Future close() {
+ super.close();
+ return completer.future;
+ }
+}