Return a future from Disconnector.disconnect(). (#4)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6e3a6ac..a018418 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.6.0
+
+* `Disconnector.disconnect()` now returns a future that completes when all the
+  inner `StreamSink.close()` futures have completed.
+
 ## 1.5.0
 
 * Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee
diff --git a/lib/src/disconnector.dart b/lib/src/disconnector.dart
index beff71d..b23813e 100644
--- a/lib/src/disconnector.dart
+++ b/lib/src/disconnector.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import 'package:async/async.dart';
+
 import '../stream_channel.dart';
 
 /// Allows the caller to force a channel to disconnect.
@@ -17,8 +19,7 @@
 /// be disconnected immediately.
 class Disconnector<T> implements StreamChannelTransformer<T, T> {
   /// Whether [disconnect] has been called.
-  bool get isDisconnected => _isDisconnected;
-  var _isDisconnected = false;
+  bool get isDisconnected => _disconnectMemo.hasRun;
 
   /// The sinks for transformed channels.
   ///
@@ -28,20 +29,25 @@
   final _sinks = <_DisconnectorSink<T>>[];
 
   /// Disconnects all channels that have been transformed.
-  void disconnect() {
-    _isDisconnected = true;
-    for (var sink in _sinks) {
-      sink._disconnect();
-    }
+  ///
+  /// Returns a future that completes when all inner sinks' [StreamSink.close]
+  /// futures have completed. Note that a [StreamController]'s sink won't close
+  /// until the corresponding stream has a listener.
+  Future disconnect() => _disconnectMemo.runOnce(() {
+    var futures = _sinks.map((sink) => sink._disconnect()).toList();
     _sinks.clear();
-  }
+    return Future.wait(futures, eagerError: true);
+  });
+  final _disconnectMemo = new AsyncMemoizer();
 
   StreamChannel<T> bind(StreamChannel<T> channel) {
     return channel.changeSink((innerSink) {
       var sink = new _DisconnectorSink<T>(innerSink);
 
-      if (_isDisconnected) {
-        sink._disconnect();
+      if (isDisconnected) {
+        // Ignore errors here, because otherwise there would be no way for the
+        // user to handle them gracefully.
+        sink._disconnect().catchError((_) {});
       } else {
         _sinks.add(sink);
       }
@@ -126,14 +132,18 @@
 
   /// Disconnects this sink.
   ///
-  /// This closes the underlying sink and stops forwarding events.
-  void _disconnect() {
+  /// This closes the underlying sink and stops forwarding events. It returns
+  /// the [StreamSink.close] future for the underlying sink.
+  Future _disconnect() {
     _isDisconnected = true;
-    _inner.close();
+    var future = _inner.close();
 
-    if (!_inAddStream) return;
-    _addStreamCompleter.complete(_addStreamSubscription.cancel());
-    _addStreamCompleter = null;
-    _addStreamSubscription = null;
+    if (_inAddStream) {
+      _addStreamCompleter.complete(_addStreamSubscription.cancel());
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    }
+
+    return future;
   }
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index bcdfe8e..46d7761 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.5.1-dev
+version: 1.6.0
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/disconnector_test.dart b/test/disconnector_test.dart
index d7fa885..311a41c 100644
--- a/test/disconnector_test.dart
+++ b/test/disconnector_test.dart
@@ -4,6 +4,7 @@
 
 import 'dart:async';
 
+import 'package:async/async.dart';
 import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
 
@@ -78,8 +79,35 @@
     expect(canceled, isTrue);
   });
 
+  test("disconnect() returns the close future from the inner sink", () async {
+    var streamController = new StreamController();
+    var sinkController = new StreamController();
+    var disconnector = new Disconnector();
+    var sink = new _CloseCompleterSink(sinkController.sink);
+    var channel = new StreamChannel.withGuarantees(
+            streamController.stream, sink)
+        .transform(disconnector);
+
+    var disconnectFutureFired = false;
+    expect(disconnector.disconnect().then((_) {
+      disconnectFutureFired = true;
+    }), completes);
+
+    // Give the future time to fire early if it's going to.
+    await pumpEventQueue();
+    expect(disconnectFutureFired, isFalse);
+
+    // When the inner sink's close future completes, so should the
+    // disconnector's.
+    sink.completer.complete();
+    await pumpEventQueue();
+    expect(disconnectFutureFired, isTrue);
+  });
+
   group("after disconnection", () {
-    setUp(() => disconnector.disconnect());
+    setUp(() {
+      disconnector.disconnect();
+    });
 
     test("closes the inner sink and ignores events to the outer sink", () {
       channel.sink.add(1);
@@ -108,3 +136,17 @@
     });
   });
 }
+
+/// A [StreamSink] wrapper that adds the ability to manually complete the Future
+/// returned by [close] using [completer].
+class _CloseCompleterSink extends DelegatingStreamSink {
+  /// The completer for the future returned by [close].
+  final completer = new Completer();
+
+  _CloseCompleterSink(StreamSink inner) : super(inner);
+
+  Future close() {
+    super.close();
+    return completer.future;
+  }
+}