Add a Disconnector class.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1679193002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b61d81..dd74936 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.3.0
+
+* Add `Disconnector`, a transformer that allows the caller to disconnect the
+  transformed channel.
+
 ## 1.2.0
 
 * Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
diff --git a/lib/src/disconnector.dart b/lib/src/disconnector.dart
new file mode 100644
index 0000000..35ecd1c
--- /dev/null
+++ b/lib/src/disconnector.dart
@@ -0,0 +1,139 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// Allows the caller to force a channel to disconnect.
+///
+/// When [disconnect] is called, the channel (or channels) transformed by this
+/// transformer will act as though the remote end had disconnected—the stream
+/// will emit a done event, and the sink will ignore future inputs. The inner
+/// sink will also be closed to notify the remote end of the disconnection.
+///
+/// If a channel is transformed after the [disconnect] has been called, it will
+/// be disconnected immediately.
+class Disconnector<T> implements StreamChannelTransformer<T, T> {
+  /// Whether [disconnect] has been called.
+  bool get isDisconnected => _isDisconnected;
+  var _isDisconnected = false;
+
+  /// The sinks for transformed channels.
+  ///
+  /// Note that we assume that transformed channels provide the stream channel
+  /// guarantees. This allows us to only track sinks, because we know closing
+  /// the underlying sink will cause the stream to emit a done event.
+  final _sinks = <_DisconnectorSink<T>>[];
+
+  /// Disconnects all channels that have been transformed.
+  void disconnect() {
+    _isDisconnected = true;
+    for (var sink in _sinks) {
+      sink._disconnect();
+    }
+    _sinks.clear();
+  }
+
+  StreamChannel<T> bind(StreamChannel<T> channel) {
+    return channel.changeSink((innerSink) {
+      var sink = new _DisconnectorSink(innerSink);
+
+      if (_isDisconnected) {
+        sink._disconnect();
+      } else {
+        _sinks.add(sink);
+      }
+
+      return sink;
+    });
+  }
+}
+
+/// A sink wrapper that can force a disconnection.
+class _DisconnectorSink<T> implements StreamSink<T> {
+  /// The inner sink.
+  final StreamSink<T> _inner;
+
+  Future get done => _inner.done;
+
+  /// Whether [Disconnector.disconnect] has been called.
+  var _isDisconnected = false;
+
+  /// Whether the user has called [close].
+  var _closed = false;
+
+  /// The subscription to the stream passed to [addStream], if a stream is
+  /// currently being added.
+  StreamSubscription<T> _addStreamSubscription;
+
+  /// The completer for the future returned by [addStream], if a stream is
+  /// currently being added.
+  Completer _addStreamCompleter;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool get _inAddStream => _addStreamSubscription != null;
+
+  _DisconnectorSink(this._inner);
+
+  void add(T data) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+    if (_isDisconnected) return;
+
+    _inner.add(data);
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+    if (_isDisconnected) return;
+
+    _inner.addError(error, stackTrace);
+  }
+
+  Future addStream(Stream<T> stream) {
+    if (_closed) throw new StateError("Cannot add stream after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add stream while adding stream.");
+    }
+    if (_isDisconnected) return new Future.value();
+
+    _addStreamCompleter = new Completer.sync();
+    _addStreamSubscription = stream.listen(
+        _inner.add,
+        onError: _inner.addError,
+        onDone: _addStreamCompleter.complete);
+    return _addStreamCompleter.future.then((_) {
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    });
+  }
+
+  Future close() {
+    if (_inAddStream) {
+      throw new StateError("Cannot close sink while adding stream.");
+    }
+
+    _closed = true;
+    return _inner.close();
+  }
+
+  /// Disconnects this sink.
+  ///
+  /// This closes the underlying sink and stops forwarding events.
+  void _disconnect() {
+    _isDisconnected = true;
+    _inner.close();
+
+    if (!_inAddStream) return;
+    _addStreamCompleter.complete(_addStreamSubscription.cancel());
+    _addStreamCompleter = null;
+    _addStreamSubscription = null;
+  }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 992f702..b89845d 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -10,6 +10,7 @@
 import 'src/stream_channel_transformer.dart';
 
 export 'src/delegating_stream_channel.dart';
+export 'src/disconnector.dart';
 export 'src/isolate_channel.dart';
 export 'src/json_document_transformer.dart';
 export 'src/multi_channel.dart';
diff --git a/pubspec.yaml b/pubspec.yaml
index 52ee521..c787b95 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.2.0
+version: 1.3.0
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/disconnector_test.dart b/test/disconnector_test.dart
new file mode 100644
index 0000000..09cdddc
--- /dev/null
+++ b/test/disconnector_test.dart
@@ -0,0 +1,113 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+import 'dart:isolate';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  var streamController;
+  var sinkController;
+  var disconnector;
+  var channel;
+  setUp(() {
+    streamController = new StreamController();
+    sinkController = new StreamController();
+    disconnector = new Disconnector();
+    channel = new StreamChannel.withGuarantees(
+            streamController.stream, sinkController.sink)
+        .transform(disconnector);
+  });
+
+  group("before disconnection", () {
+    test("forwards events from the sink as normal", () {
+      channel.sink.add(1);
+      channel.sink.add(2);
+      channel.sink.add(3);
+      channel.sink.close();
+
+      expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test("forwards events to the stream as normal", () {
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      streamController.close();
+
+      expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test("events can't be added when the sink is explicitly closed", () {
+      sinkController.stream.listen(null); // Work around sdk#19095.
+
+      expect(channel.sink.close(), completes);
+      expect(() => channel.sink.add(1), throwsStateError);
+      expect(() => channel.sink.addError("oh no"), throwsStateError);
+      expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+          throwsStateError);
+    });
+
+    test("events can't be added while a stream is being added", () {
+      var controller = new StreamController();
+      channel.sink.addStream(controller.stream);
+
+      expect(() => channel.sink.add(1), throwsStateError);
+      expect(() => channel.sink.addError("oh no"), throwsStateError);
+      expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+          throwsStateError);
+      expect(() => channel.sink.close(), throwsStateError);
+
+      controller.close();
+    });
+  });
+
+  test("cancels addStream when disconnected", () async {
+    var canceled = false;
+    var controller = new StreamController(onCancel: () {
+      canceled = true;
+    });
+    expect(channel.sink.addStream(controller.stream), completes);
+    disconnector.disconnect();
+
+    await pumpEventQueue();
+    expect(canceled, isTrue);
+  });
+
+  group("after disconnection", () {
+    setUp(() => disconnector.disconnect());
+
+    test("closes the inner sink and ignores events to the outer sink", () {
+      channel.sink.add(1);
+      channel.sink.add(2);
+      channel.sink.add(3);
+      channel.sink.close();
+ 
+      expect(sinkController.stream.toList(), completion(isEmpty));
+    });
+
+    test("closes the stream", () {
+      expect(channel.stream.toList(), completion(isEmpty));
+    });
+
+    test("completes done", () {
+      sinkController.stream.listen(null); // Work around sdk#19095.
+      expect(channel.sink.done, completes);
+    });
+ 
+    test("still emits state errors after explicit close", () {
+      sinkController.stream.listen(null); // Work around sdk#19095.
+      expect(channel.sink.close(), completes);
+
+      expect(() => channel.sink.add(1), throwsStateError);
+      expect(() => channel.sink.addError("oh no"), throwsStateError);
+    });
+  });
+}
\ No newline at end of file