Add StreamChannel.withCloseGuarantee.

This also properly enforces the close guarantee for transformers
provided by this package.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//2041983003 .
diff --git a/pkgs/stream_channel/CHANGELOG.md b/pkgs/stream_channel/CHANGELOG.md
index 6d86e48..6e3a6ac 100644
--- a/pkgs/stream_channel/CHANGELOG.md
+++ b/pkgs/stream_channel/CHANGELOG.md
@@ -1,3 +1,14 @@
+## 1.5.0
+
+* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee
+  that closing the sink causes the stream to close before it emits any more
+  events. This is the only guarantee that isn't automatically preserved when
+  transforming a channel.
+
+* `StreamChannelTransformer`s provided by the `stream_channel` package now
+  properly provide the guarantee that closing the sink causes the stream to
+  close before it emits any more events
+
 ## 1.4.0
 
 * Add `StreamChannel.cast()`, which soundly coerces the generic type of a
diff --git a/pkgs/stream_channel/lib/src/close_guarantee_channel.dart b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart
new file mode 100644
index 0000000..a2c69bc
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart
@@ -0,0 +1,86 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that specifically enforces the stream channel guarantee
+/// that closing the sink causes the stream to close before it emits any more
+/// events
+///
+/// This is exposed via [new StreamChannel.withCloseGuarantee].
+class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> {
+  Stream<T> get stream => _stream;
+  _CloseGuaranteeStream<T> _stream;
+
+  StreamSink<T> get sink => _sink;
+  _CloseGuaranteeSink<T> _sink;
+
+  /// The subscription to the inner stream.
+  StreamSubscription<T> _subscription;
+
+  /// Whether the sink has closed, causing the underlying channel to disconnect.
+  bool _disconnected = false;
+
+  CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
+    _sink = new _CloseGuaranteeSink<T>(innerSink, this);
+    _stream = new _CloseGuaranteeStream<T>(innerStream, this);
+  }
+}
+
+/// The stream for [CloseGuaranteeChannel].
+///
+/// This wraps the inner stream to save the subscription on the channel when
+/// [listen] is called.
+class _CloseGuaranteeStream<T> extends Stream<T> {
+  /// The inner stream this is delegating to.
+  final Stream<T> _inner;
+
+  /// The [CloseGuaranteeChannel] this belongs to.
+  final CloseGuaranteeChannel<T> _channel;
+
+  _CloseGuaranteeStream(this._inner, this._channel);
+
+  StreamSubscription<T> listen(void onData(T event),
+      {Function onError, void onDone(), bool cancelOnError}) {
+    // If the channel is already disconnected, we shouldn't dispatch anything
+    // but a done event.
+    if (_channel._disconnected) {
+      onData = null;
+      onError = null;
+    }
+
+    var subscription = _inner.listen(onData,
+        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+    if (!_channel._disconnected) {
+      _channel._subscription = subscription;
+    }
+    return subscription;
+  }
+}
+
+/// The sink for [CloseGuaranteeChannel].
+///
+/// This wraps the inner sink to cancel the stream subscription when the sink is
+/// canceled.
+class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> {
+  /// The [CloseGuaranteeChannel] this belongs to.
+  final CloseGuaranteeChannel<T> _channel;
+
+  _CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner);
+
+  Future close() {
+    var done = super.close();
+    _channel._disconnected = true;
+    if (_channel._subscription != null) {
+      // Don't dispatch anything but a done event.
+      _channel._subscription.onData(null);
+      _channel._subscription.onError(null);
+    }
+    return done;
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/json_document_transformer.dart b/pkgs/stream_channel/lib/src/json_document_transformer.dart
index c62c597..86b9ae7 100644
--- a/pkgs/stream_channel/lib/src/json_document_transformer.dart
+++ b/pkgs/stream_channel/lib/src/json_document_transformer.dart
@@ -40,6 +40,6 @@
     var sink = new StreamSinkTransformer.fromHandlers(handleData: (data, sink) {
       sink.add(_codec.encode(data));
     }).bind(channel.sink);
-    return new StreamChannel(stream, sink);
+    return new StreamChannel.withCloseGuarantee(stream, sink);
   }
 }
diff --git a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
index ca09ea1..46232d7 100644
--- a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
+++ b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
@@ -17,7 +17,14 @@
 /// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes
 /// in the original [StreamChannel] and returns the transformed version.
 ///
-/// Transformers must be able to have `bind` called multiple times.
+/// Transformers must be able to have [bind] called multiple times. If a
+/// subclass implements [bind] explicitly, it should be sure that the returned
+/// stream follows the second stream channel guarantee: closing the sink causes
+/// the stream to close before it emits any more events. This guarantee is
+/// invalidated when an asynchronous gap is added between the original stream's
+/// event dispatch and the returned stream's, for example by transforming it
+/// with a [StreamTransformer]. The guarantee can be easily preserved using [new
+/// StreamChannel.withCloseGuarantee].
 class StreamChannelTransformer<S, T> {
   /// The transformer to use on the channel's stream.
   final StreamTransformer<T, S> _streamTransformer;
@@ -63,7 +70,7 @@
   /// `channel.straem`, the transformer will transform them and pass the
   /// transformed versions to the returned channel's stream.
   StreamChannel<S> bind(StreamChannel<T> channel) =>
-      new StreamChannel<S>(
+      new StreamChannel<S>.withCloseGuarantee(
           channel.stream.transform(_streamTransformer),
           _sinkTransformer.bind(channel.sink));
 }
diff --git a/pkgs/stream_channel/lib/stream_channel.dart b/pkgs/stream_channel/lib/stream_channel.dart
index 3615d21..16323b1 100644
--- a/pkgs/stream_channel/lib/stream_channel.dart
+++ b/pkgs/stream_channel/lib/stream_channel.dart
@@ -7,6 +7,7 @@
 import 'package:async/async.dart';
 
 import 'src/guarantee_channel.dart';
+import 'src/close_guarantee_channel.dart';
 import 'src/stream_channel_transformer.dart';
 
 export 'src/delegating_stream_channel.dart';
@@ -87,6 +88,19 @@
           {bool allowSinkErrors: true}) =>
       new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
 
+  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+  ///
+  /// This specifically enforces the second guarantee: closing the sink causes
+  /// the stream to close before it emits any more events. This guarantee is
+  /// invalidated when an asynchronous gap is added between the original
+  /// stream's event dispatch and the returned stream's, for example by
+  /// transforming it with a [StreamTransformer]. This is a lighter-weight way
+  /// of preserving that guarantee in particular than
+  /// [StreamChannel.withGuarantees].
+  factory StreamChannel.withCloseGuarantee(Stream<T> stream,
+          StreamSink<T> sink) =>
+      new CloseGuaranteeChannel(stream, sink);
+
   /// Connects [this] to [other], so that any values emitted by either are sent
   /// directly to the other.
   void pipe(StreamChannel<T> other);
@@ -148,10 +162,10 @@
       changeSink(transformer.bind);
 
   StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) =>
-      new StreamChannel(change(stream), sink);
+      new StreamChannel.withCloseGuarantee(change(stream), sink);
 
   StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) =>
-      new StreamChannel(stream, change(sink));
+      new StreamChannel.withCloseGuarantee(stream, change(sink));
 
   StreamChannel/*<S>*/ cast/*<S>*/() => new StreamChannel(
       DelegatingStream.typed(stream), DelegatingStreamSink.typed(sink));
diff --git a/pkgs/stream_channel/test/with_close_guarantee_test.dart b/pkgs/stream_channel/test/with_close_guarantee_test.dart
new file mode 100644
index 0000000..caf48cf
--- /dev/null
+++ b/pkgs/stream_channel/test/with_close_guarantee_test.dart
@@ -0,0 +1,66 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+final _delayTransformer = new StreamTransformer.fromHandlers(
+    handleData: (data, sink) => new Future.microtask(() => sink.add(data)),
+    handleDone: (sink) => new Future.microtask(() => sink.close()));
+
+final _delaySinkTransformer =
+    new StreamSinkTransformer.fromStreamTransformer(_delayTransformer);
+
+void main() {
+  var controller;
+  var channel;
+  setUp(() {
+    controller = new StreamChannelController();
+
+    // Add a bunch of layers of asynchronous dispatch between the channel and
+    // the underlying controllers.
+    var stream = controller.foreign.stream;
+    var sink = controller.foreign.sink;
+    for (var i = 0; i < 10; i++) {
+      stream = stream.transform(_delayTransformer);
+      sink = _delaySinkTransformer.bind(sink);
+    }
+
+    channel = new StreamChannel.withCloseGuarantee(stream, sink);
+  });
+
+  test("closing the event sink causes the stream to close before it emits any "
+      "more events", () async {
+    controller.local.sink.add(1);
+    controller.local.sink.add(2);
+    controller.local.sink.add(3);
+
+    expect(channel.stream.listen(expectAsync((event) {
+      if (event == 2) channel.sink.close();
+    }, count: 2)).asFuture(), completes);
+
+    await pumpEventQueue();
+  });
+
+  test("closing the event sink before events are emitted causes the stream to "
+      "close immediately", () async {
+    channel.sink.close();
+    channel.stream.listen(
+        expectAsync((_) {}, count: 0),
+        onError: expectAsync((_, __) {}, count: 0),
+        onDone: expectAsync(() {}));
+
+    controller.local.sink.add(1);
+    controller.local.sink.add(2);
+    controller.local.sink.add(3);
+    controller.local.sink.close();
+
+    await pumpEventQueue();
+  });
+}