Add StreamChannel.withGuarantees and StreamChannelController.

These APIs make it easier for users to create their own stream channels
that follow the StreamChannel guarantees.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1662773003 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8138708..0b61d81 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,11 @@
+## 1.2.0
+
+* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
+  wrapping to ensure that it obeys the stream channel guarantees.
+
+* Add `StreamChannelController`, which can be used to create custom
+  `StreamChannel` objects.
+
 ## 1.1.1
 
 * Fix the type annotation for `StreamChannel.transform()`'s parameter.
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
new file mode 100644
index 0000000..849cc2f
--- /dev/null
+++ b/lib/src/guarantee_channel.dart
@@ -0,0 +1,163 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that enforces the stream channel guarantees.
+///
+/// This is exposed via [new StreamChannel.withGuarantees].
+class GuaranteeChannel<T> extends StreamChannelMixin<T> {
+  Stream<T> get stream => _streamController.stream;
+
+  StreamSink<T> get sink => _sink;
+  _GuaranteeSink<T> _sink;
+
+  /// The controller for [stream].
+  ///
+  /// This intermediate controller allows us to continue listening for a done
+  /// event even after the user has canceled their subscription, and to send our
+  /// own done event when the sink is closed.
+  StreamController<T> _streamController;
+
+  /// The subscription to the inner stream.
+  StreamSubscription<T> _subscription;
+
+  /// Whether the sink has closed, causing the underlying channel to disconnect.
+  bool _disconnected = false;
+
+  GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
+    _sink = new _GuaranteeSink<T>(innerSink, this);
+
+    // Enforce the single-subscription guarantee by changing a broadcast stream
+    // to single-subscription.
+    if (innerStream.isBroadcast) {
+      innerStream = innerStream.transform(
+          const SingleSubscriptionTransformer());
+    }
+
+    _streamController = new StreamController<T>(onListen: () {
+      // If the sink has disconnected, we've already called
+      // [_streamController.close].
+      if (_disconnected) return;
+
+      _subscription = innerStream.listen(_streamController.add,
+          onError: _streamController.addError,
+          onDone: () {
+            _sink._onStreamDisconnected();
+            _streamController.close();
+          });
+    }, sync: true);
+  }
+
+  /// Called by [_GuaranteeSink] when the user closes it.
+  ///
+  /// The sink closing indicates that the connection is closed, so the stream
+  /// should stop emitting events.
+  void _onSinkDisconnected() {
+    _disconnected = true;
+    if (_subscription != null) _subscription.cancel();
+    _streamController.close();
+  }
+}
+
+/// The sink for [GuaranteeChannel].
+///
+/// This wraps the inner sink to ignore events and cancel any in-progress
+/// [addStream] calls when the underlying channel closes.
+class _GuaranteeSink<T> implements StreamSink<T> {
+  /// The inner sink being wrapped.
+  final StreamSink<T> _inner;
+
+  /// The [GuaranteeChannel] this belongs to.
+  final GuaranteeChannel<T> _channel;
+
+  Future get done => _inner.done;
+
+  /// Whether the stream has emitted a done event, causing the underlying
+  /// channel to disconnect.
+  bool _disconnected = false;
+
+  /// Whether the user has called [close].
+  bool _closed = false;
+
+  /// The subscription to the stream passed to [addStream], if a stream is
+  /// currently being added.
+  StreamSubscription<T> _addStreamSubscription;
+
+  /// The completer for the future returned by [addStream], if a stream is
+  /// currently being added.
+  Completer _addStreamCompleter;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool get _inAddStream => _addStreamSubscription != null;
+
+  _GuaranteeSink(this._inner, this._channel);
+
+  void add(T data) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+    if (_disconnected) return;
+
+    _inner.add(data);
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+    if (_disconnected) return;
+
+    _inner.addError(error, stackTrace);
+  }
+
+  Future addStream(Stream<T> stream) {
+    if (_closed) throw new StateError("Cannot add stream after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add stream while adding stream.");
+    }
+    if (_disconnected) return new Future.value();
+
+    _addStreamCompleter = new Completer.sync();
+    _addStreamSubscription = stream.listen(
+        _inner.add,
+        onError: _inner.addError,
+        onDone: _addStreamCompleter.complete);
+    return _addStreamCompleter.future.then((_) {
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    });
+  }
+
+  Future close() {
+    if (_inAddStream) {
+      throw new StateError("Cannot close sink while adding stream.");
+    }
+
+    _closed = true;
+    if (_disconnected) return new Future.value();
+
+    _channel._onSinkDisconnected();
+    return _inner.close();
+  }
+
+  /// Called by [GuaranteeChannel] when the stream emits a done event.
+  ///
+  /// The stream being done indicates that the connection is closed, so the
+  /// sink should stop forwarding events.
+  void _onStreamDisconnected() {
+    _disconnected = true;
+    if (!_inAddStream) return;
+
+    _addStreamCompleter.complete(_addStreamSubscription.cancel());
+    _addStreamCompleter = null;
+    _addStreamSubscription = null;
+  }
+}
diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart
new file mode 100644
index 0000000..1812a0e
--- /dev/null
+++ b/lib/src/stream_channel_controller.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A controller for exposing a new [StreamChannel].
+///
+/// This exposes two connected [StreamChannel]s, [local] and [foreign]. The
+/// user's code should use [local] to emit and receive events. Then [foreign]
+/// can be returned for others to use. For example, here's a simplified version
+/// of the implementation of [new IsolateChannel]:
+///
+/// ```dart
+/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
+///   var controller = new StreamChannelController();
+///
+///   // Pipe all events from the receive port into the local sink...
+///   receivePort.pipe(controller.local.sink);
+///
+///   // ...and all events from the local stream into the send port.
+///   controller.local.listen(sendPort.add, onDone: receivePort.close);
+///
+///   // Then return the foreign controller for your users to use.
+///   return controller.foreign;
+/// }
+/// ```
+class StreamChannelController<T> {
+  /// The local channel.
+  ///
+  /// This channel should be used directly by the creator of this
+  /// [StreamChannelController] to send and receive events.
+  StreamChannel<T> get local => _local;
+  StreamChannel<T> _local;
+
+  /// The foreign channel.
+  ///
+  /// This channel should be returned to external users so they can communicate
+  /// with [local].
+  StreamChannel<T> get foreign => _foreign;
+  StreamChannel<T> _foreign;
+
+  /// Creates a [StreamChannelController].
+  ///
+  /// If [sync] is true, events added to either channel's sink are synchronously
+  /// dispatched to the other channel's stream. This should only be done if the
+  /// source of those events is already asynchronous.
+  StreamChannelController({bool sync: false}) {
+    var localToForeignController = new StreamController<T>(sync: sync);
+    var foreignToLocalController = new StreamController<T>(sync: sync);
+    _local = new StreamChannel<T>.withGuarantees(
+        foreignToLocalController.stream, localToForeignController.sink);
+    _foreign = new StreamChannel<T>.withGuarantees(
+        localToForeignController.stream, foreignToLocalController.sink);
+  }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 7fff674..8c4fad5 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -6,6 +6,7 @@
 
 import 'package:async/async.dart';
 
+import 'src/guarantee_channel.dart';
 import 'src/stream_channel_transformer.dart';
 
 export 'src/delegating_stream_channel.dart';
@@ -13,6 +14,7 @@
 export 'src/json_document_transformer.dart';
 export 'src/multi_channel.dart';
 export 'src/stream_channel_completer.dart';
+export 'src/stream_channel_controller.dart';
 export 'src/stream_channel_transformer.dart';
 
 /// An abstract class representing a two-way communication channel.
@@ -65,10 +67,20 @@
   /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
   ///
   /// Note that this stream/sink pair must provide the guarantees listed in the
-  /// [StreamChannel] documentation.
+  /// [StreamChannel] documentation. If they don't do so natively, [new
+  /// StreamChannel.withGuarantees] should be used instead.
   factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
       new _StreamChannel<T>(stream, sink);
 
+  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+  ///
+  /// Unlike [new StreamChannel], this enforces the guarantees listed in the
+  /// [StreamChannel] documentation. This makes it somewhat less efficient than
+  /// just wrapping a stream and a sink directly, so [new StreamChannel] should
+  /// be used when the guarantees are provided natively.
+  factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink) =>
+      new GuaranteeChannel(stream, sink);
+
   /// Connects [this] to [other], so that any values emitted by either are sent
   /// directly to the other.
   void pipe(StreamChannel<T> other);
diff --git a/pubspec.yaml b/pubspec.yaml
index 7d1c6a3..f0b830c 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.1.1
+version: 1.2.0-dev
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/stream_channel_controller_test.dart b/test/stream_channel_controller_test.dart
new file mode 100644
index 0000000..e45f090
--- /dev/null
+++ b/test/stream_channel_controller_test.dart
@@ -0,0 +1,86 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  group("asynchronously", () {
+    var controller;
+    setUp(() {
+      controller = new StreamChannelController();
+    });
+
+    test("forwards events from the local sink to the foreign stream", () {
+      controller.local.sink..add(1)..add(2)..add(3)..close();
+      expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test("forwards events from the foreign sink to the local stream", () {
+      controller.foreign.sink..add(1)..add(2)..add(3)..close();
+      expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
+    });
+  });
+
+  group("synchronously", () {
+    var controller;
+    setUp(() {
+      controller = new StreamChannelController(sync: true);
+    });
+
+    test("synchronously forwards events from the local sink to the foreign "
+        "stream", () {
+      var receivedEvent = false;
+      var receivedError = false;
+      var receivedDone = false;
+      controller.foreign.stream.listen(expectAsync((event) {
+        expect(event, equals(1));
+        receivedEvent = true;
+      }), onError: expectAsync((error) {
+        expect(error, equals("oh no"));
+        receivedError = true;
+      }), onDone: expectAsync(() {
+        receivedDone = true;
+      }));
+
+      controller.local.sink.add(1);
+      expect(receivedEvent, isTrue);
+
+      controller.local.sink.addError("oh no");
+      expect(receivedError, isTrue);
+
+      controller.local.sink.close();
+      expect(receivedDone, isTrue);
+    });
+
+    test("synchronously forwards events from the foreign sink to the local "
+        "stream", () {
+      var receivedEvent = false;
+      var receivedError = false;
+      var receivedDone = false;
+      controller.local.stream.listen(expectAsync((event) {
+        expect(event, equals(1));
+        receivedEvent = true;
+      }), onError: expectAsync((error) {
+        expect(error, equals("oh no"));
+        receivedError = true;
+      }), onDone: expectAsync(() {
+        receivedDone = true;
+      }));
+
+      controller.foreign.sink.add(1);
+      expect(receivedEvent, isTrue);
+
+      controller.foreign.sink.addError("oh no");
+      expect(receivedError, isTrue);
+
+      controller.foreign.sink.close();
+      expect(receivedDone, isTrue);
+    });
+  });
+}
diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart
new file mode 100644
index 0000000..0e95f9d
--- /dev/null
+++ b/test/with_guarantees_test.dart
@@ -0,0 +1,110 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  var streamController;
+  var sinkController;
+  var channel;
+  setUp(() {
+    streamController = new StreamController();
+    sinkController = new StreamController();
+    channel = new StreamChannel.withGuarantees(
+        streamController.stream, sinkController.sink);
+  });
+
+  group("with a broadcast stream", () {
+    setUp(() {
+      streamController = new StreamController.broadcast();
+      channel = new StreamChannel.withGuarantees(
+          streamController.stream, sinkController.sink);
+    });
+
+    test("buffers events", () async {
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      await pumpEventQueue();
+
+      expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+      streamController.close();
+    });
+
+    test("only allows a single subscription", () {
+      channel.stream.listen(null);
+      expect(() => channel.stream.listen(null), throwsStateError);
+    });
+  });
+
+  test("closing the event sink causes the stream to close before it emits any "
+      "more events", () {
+    streamController.add(1);
+    streamController.add(2);
+    streamController.add(3);
+
+    expect(channel.stream.listen(expectAsync((event) {
+      if (event == 2) channel.sink.close();
+    }, count: 2)).asFuture(), completes);
+  });
+
+  test("after the stream closes, the sink ignores events", () async {
+    streamController.close();
+
+    // Wait for the done event to be delivered.
+    await channel.stream.toList();
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+    channel.sink.close();
+
+    // None of our channel.sink additions should make it to the other endpoint.
+    sinkController.stream.listen(
+        expectAsync((_) {}, count: 0),
+        onDone: expectAsync(() {}, count: 0));
+    await pumpEventQueue();
+  });
+
+  test("canceling the stream's subscription has no effect on the sink",
+      () async {
+    channel.stream.listen(null).cancel();
+    await pumpEventQueue();
+
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+    channel.sink.close();
+    expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+  });
+
+  test("canceling the stream's subscription doesn't stop a done event",
+      () async {
+    channel.stream.listen(null).cancel();
+    await pumpEventQueue();
+
+    streamController.close();
+    await pumpEventQueue();
+
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+    channel.sink.close();
+
+    // The sink should be ignoring events because the stream closed.
+    sinkController.stream.listen(
+        expectAsync((_) {}, count: 0),
+        onDone: expectAsync(() {}, count: 0));
+    await pumpEventQueue();
+  });
+
+  test("forwards errors to the other endpoint", () {
+    channel.sink.addError("error");
+    expect(sinkController.stream.first, throwsA("error"));
+  });
+}