Add an IO implementation of WebSocketChannel.

R=kevmoo@google.com

Review URL: https://codereview.chromium.org//1756613002 .
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..48b2d28
--- /dev/null
+++ b/README.md
@@ -0,0 +1,79 @@
+The `web_socket_channel` package provides [`StreamChannel`][stream_channel]
+wrappers for WebSocket connections. It provides a cross-platform
+[`WebSocketChannel`][WebSocketChannel] API, a cross-platform implementation of
+that API that communicates over an underlying [`StreamChannel`][stream_channel],
+and [an implementation][IOWebSocketChannel] that wraps `dart:io`'s `WebSocket`
+class.
+
+[stream_channel]: https://pub.dartlang.org/packages/stream_channel
+[WebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel-class.html
+[IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel-class.html
+
+## `WebSocketChannel`
+
+The [`WebSocketChannel`][WebSocketChannel] class's most important role is as the
+interface for WebSocket stream channels across all implementations and all
+platforms. In addition to the base `StreamChannel` interface, it adds a
+[`protocol`][protocol] getter that returns the negotiated protocol for the
+socket; a [`pingInterval`][pingInterval] property that allows you to control the
+socket's keep-alive behavior; and [`closeCode`][closeCode] and
+[`closeReason`][closeReason] getters that provide information about why the
+socket closed.
+
+[protocol]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/protocol.html
+[pingInterval]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/pingInterval.html
+[closeCode]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/closeCode.html
+[closeReason]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/closeReason.html
+
+The channel's [`sink` property][sink] is also special. It returns a
+[`WebSocketSink`][WebSocketSink], which is just like a `StreamSink` except that
+its [`close()`][sink.close] method supports optional `closeCode` and
+`closeReason` parameters. These parameters allow the caller to signal to the
+other socket exactly why they're closing the connection.
+
+[sink]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/sink.html
+[WebSocketSink]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketSink-class.html
+[sink.close]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketSink/close.html
+
+`WebSocketChannel` also works as a cross-platform implementation of the
+WebSocket protocol. Because it can't initiate or handle HTTP requests in a
+cross-platform way, the [`new WebSocketChannel()` constructor][new] takes an
+underlying [`StreamChannel`][stream_channel] over which it communicates using
+the WebSocket protocol. It also provides the static [`signKey()`][signKey]
+method to make it easier to implement the [initial WebSocket handshake][]. These
+are used in the [`shelf_web_socket`][shelf_web_socket] package to support
+WebSockets in a cross-platform way.
+
+[new]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/WebSocketChannel.html
+[signKey]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/signKey.html
+[initial WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
+[shelf_web_socket]: https://pub.dartlang.org/packages/shelf_web_socket
+
+## `IOWebSocketChannel`
+
+The [`IOWebSocketChannel`][IOWebSocketChannel] class wraps
+[`dart:io`'s `WebSocket` class][io.WebSocket]. Because it imports `dart:io`, it
+has its own library, `package:web_socket_channel/io.dart`. This allows the main
+`WebSocketChannel` class to be available on all platforms.
+
+[io.WebSocket]: https://api.dartlang.org/latest/dart-io/WebSocket-class.html
+
+An `IOWebSocketChannel` can be created by passing a `dart:io` WebSocket to
+[its constructor][new IOWebSocketChannel]. It's more common to want to connect
+directly to a `ws://` or `wss://` URL, in which case
+[`new IOWebSocketChannel.connect()`][IOWebSocketChannel.connect] should be used.
+
+[new IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel/IOWebSocketChannel.html
+[IOWebSocketChannel.connect]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel/IOWebSocketChannel.connect.html
+
+```dart
+import 'package:web_socket_channel/io.dart';
+
+main() async {
+  var channel = new IOWebSocketChannel.connect("ws://localhost:8181");
+  channel.sink.add("connected!");
+  channel.sink.listen((message) {
+    // ...
+  });
+}
+```
diff --git a/lib/io.dart b/lib/io.dart
new file mode 100644
index 0000000..4c06c07
--- /dev/null
+++ b/lib/io.dart
@@ -0,0 +1,113 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import 'src/channel.dart';
+import 'src/exception.dart';
+import 'src/sink_completer.dart';
+
+/// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket].
+class IOWebSocketChannel extends StreamChannelMixin
+    implements WebSocketChannel {
+  /// The underlying `dart:io` [WebSocket].
+  ///
+  /// If the channel was constructed with [IOWebSocketChannel.connect], this is
+  /// `null` until the [WebSocket.connect] future completes.
+  WebSocket _webSocket;
+
+  Duration get pingInterval =>
+      _webSocket == null ? _pingInterval : _webSocket.pingInterval;
+
+  set pingInterval(Duration value) {
+    if (_webSocket == null) {
+      _pingInterval = value;
+    } else {
+      _webSocket.pingInterval = value;
+    }
+  }
+
+  /// The ping interval set by the user.
+  ///
+  /// This is stored independently of [_webSocket] so that the user can set it
+  /// prior to [_webSocket] getting a value.
+  Duration _pingInterval;
+
+  String get protocol => _webSocket?.protocol;
+  int get closeCode => _webSocket?.closeCode;
+  String get closeReason => _webSocket?.closeReason;
+
+  final Stream stream;
+  final WebSocketSink sink;
+
+  // TODO(nweiz): Add a compression parameter after the initial release.
+
+  /// Creates a new WebSocket connection.
+  ///
+  /// Connects to [url] using [WebSocket.connect] and returns a channel that can
+  /// be used to communicate over the resulting socket. The [url] may be either
+  /// a [String] or a [Uri]; otherwise, the parameters are the same as
+  /// [WebSocket.connect].
+  ///
+  /// If there's an error connecting, the channel's stream emits a
+  /// [WebSocketChannelException] wrapping that error and then closes.
+  factory IOWebSocketChannel.connect(url, {Iterable<String> protocols,
+      Map<String, dynamic> headers}) {
+    var channel;
+    var sinkCompleter = new WebSocketSinkCompleter();
+    var stream = StreamCompleter.fromFuture(
+        WebSocket.connect(url.toString(), headers: headers).then((webSocket) {
+      channel._setWebSocket(webSocket);
+      sinkCompleter.setDestinationSink(new _IOWebSocketSink(webSocket));
+      return webSocket;
+    }).catchError((error) => throw new WebSocketChannelException.from(error)));
+
+    channel = new IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
+    return channel;
+  }
+
+  /// Creates a channel wrapping [socket].
+  IOWebSocketChannel(WebSocket socket)
+      : _webSocket = socket,
+        stream = socket.handleError((error) =>
+            throw new WebSocketChannelException.from(error)),
+        sink = new _IOWebSocketSink(socket);
+
+  /// Creates a channel without a socket.
+  ///
+  /// This is used with [connect] to synchronously provide a channel that later
+  /// has a socket added.
+  IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
+      : _webSocket = null,
+        stream = stream.handleError((error) =>
+            throw new WebSocketChannelException.from(error));
+
+  /// Sets the underlying web socket.
+  ///
+  /// This is called by [connect] once the [WebSocket.connect] future has
+  /// completed.
+  void _setWebSocket(WebSocket webSocket) {
+    assert(_webSocket == null);
+
+    _webSocket = webSocket;
+    if (_pingInterval != null) _webSocket.pingInterval = pingInterval;
+  }
+}
+
+/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
+class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink {
+  /// The underlying socket.
+  final WebSocket _webSocket;
+
+  _IOWebSocketSink(WebSocket webSocket)
+      : super(webSocket),
+        _webSocket = webSocket;
+
+  Future close([int closeCode, String closeReason]) =>
+      _webSocket.close(closeCode, closeReason);
+}
diff --git a/lib/src/channel.dart b/lib/src/channel.dart
index a8219a1..a4e3ed5 100644
--- a/lib/src/channel.dart
+++ b/lib/src/channel.dart
@@ -70,7 +70,7 @@
   WebSocketSink get sink => new WebSocketSink._(_webSocket);
 
   /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
-  /// the [initial handshake].
+  /// the [initial handshake][].
   ///
   /// The return value should be sent back to the client in a
   /// `Sec-WebSocket-Accept` header.
diff --git a/lib/src/exception.dart b/lib/src/exception.dart
index d06dc60..47545ed 100644
--- a/lib/src/exception.dart
+++ b/lib/src/exception.dart
@@ -11,7 +11,11 @@
   /// The exception that caused this one, if available.
   final inner;
 
-  WebSocketChannelException([this.message, this.inner]);
+  WebSocketChannelException([this.message]) : inner = null;
+
+  WebSocketChannelException.from(inner)
+      : message = inner.toString(),
+        inner = inner;
 
   String toString() => message == null
       ? "WebSocketChannelException" :
diff --git a/lib/src/sink_completer.dart b/lib/src/sink_completer.dart
new file mode 100644
index 0000000..d932fd7
--- /dev/null
+++ b/lib/src/sink_completer.dart
@@ -0,0 +1,153 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'channel.dart';
+
+/// A [WebSocketSink] where the destination is provided later.
+///
+/// This is like a [StreamSinkCompleter], except that it properly forwards
+/// paramters to [WebSocketSink.close].
+class WebSocketSinkCompleter {
+  /// The sink for this completer.
+  ///
+  /// When a destination sink is provided, events that have been passed to the
+  /// sink will be forwarded to the destination.
+  ///
+  /// Events can be added to the sink either before or after a destination sink
+  /// is set.
+  final WebSocketSink sink = new _CompleterSink();
+
+  /// Returns [sink] typed as a [_CompleterSink].
+  _CompleterSink get _sink => sink;
+
+  /// Sets a sink as the destination for events from the
+  /// [WebSocketSinkCompleter]'s [sink].
+  ///
+  /// The completer's [sink] will act exactly as [destinationSink].
+  ///
+  /// If the destination sink is set before events are added to [sink], further
+  /// events are forwarded directly to [destinationSink].
+  ///
+  /// If events are added to [sink] before setting the destination sink, they're
+  /// buffered until the destination is available.
+  ///
+  /// A destination sink may be set at most once.
+  void setDestinationSink(WebSocketSink destinationSink) {
+    if (_sink._destinationSink != null) {
+      throw new StateError("Destination sink already set");
+    }
+    _sink._setDestinationSink(destinationSink);
+  }
+}
+
+/// [WebSocketSink] completed by [WebSocketSinkCompleter].
+class _CompleterSink implements WebSocketSink {
+  /// Controller for an intermediate sink.
+  ///
+  /// Created if the user adds events to this sink before the destination sink
+  /// is set.
+  StreamController _controller;
+
+  /// Completer for [done].
+  ///
+  /// Created if the user requests the [done] future before the destination sink
+  /// is set.
+  Completer _doneCompleter;
+
+  /// Destination sink for the events added to this sink.
+  ///
+  /// Set when [WebSocketSinkCompleter.setDestinationSink] is called.
+  WebSocketSink _destinationSink;
+
+  /// The close code passed to [close].
+  int _closeCode;
+
+  /// The close reason passed to [close].
+  String _closeReason;
+
+  /// Whether events should be sent directly to [_destinationSink], as opposed
+  /// to going through [_controller].
+  bool get _canSendDirectly => _controller == null && _destinationSink != null;
+
+  Future get done {
+    if (_doneCompleter != null) return _doneCompleter.future;
+    if (_destinationSink == null) {
+      _doneCompleter = new Completer.sync();
+      return _doneCompleter.future;
+    }
+    return _destinationSink.done;
+  }
+
+  void add(event) {
+    if (_canSendDirectly) {
+      _destinationSink.add(event);
+    } else {
+      _ensureController();
+      _controller.add(event);
+    }
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_canSendDirectly) {
+      _destinationSink.addError(error, stackTrace);
+    } else {
+      _ensureController();
+      _controller.addError(error, stackTrace);
+    }
+  }
+
+  Future addStream(Stream stream) {
+    if (_canSendDirectly) return _destinationSink.addStream(stream);
+
+    _ensureController();
+    return _controller.addStream(stream, cancelOnError: false);
+  }
+
+  Future close([int closeCode, String closeReason]) {
+    if (_canSendDirectly) {
+      _destinationSink.close(closeCode, closeReason);
+    } else {
+      _closeCode = closeCode;
+      _closeReason = closeReason;
+      _ensureController();
+      _controller.close();
+    }
+    return done;
+  }
+
+  /// Create [_controller] if it doesn't yet exist.
+  void _ensureController() {
+    if (_controller == null) _controller = new StreamController(sync: true);
+  }
+
+  /// Sets the destination sink to which events from this sink will be provided.
+  ///
+  /// If set before the user adds events, events will be added directly to the
+  /// destination sink. If the user adds events earlier, an intermediate sink is
+  /// created using a stream controller, and the destination sink is linked to
+  /// it later.
+  void _setDestinationSink(WebSocketSink sink) {
+    assert(_destinationSink == null);
+    _destinationSink = sink;
+
+    // If the user has already added data, it's buffered in the controller, so
+    // we add it to the sink.
+    if (_controller != null) {
+      // Catch any error that may come from [addStream] or [sink.close]. They'll
+      // be reported through [done] anyway.
+      sink
+          .addStream(_controller.stream)
+          .whenComplete(() => sink.close(_closeCode, _closeReason))
+          .catchError((_) {});
+    }
+
+    // If the user has already asked when the sink is done, connect the sink's
+    // done callback to that completer.
+    if (_doneCompleter != null) {
+      _doneCompleter.complete(sink.done);
+    }
+  }
+}
\ No newline at end of file
diff --git a/lib/web_socket_channel.dart b/lib/web_socket_channel.dart
index 299604c..54c05a0 100644
--- a/lib/web_socket_channel.dart
+++ b/lib/web_socket_channel.dart
@@ -1,4 +1,4 @@
-// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
diff --git a/test/io_test.dart b/test/io_test.dart
new file mode 100644
index 0000000..fcf8c7c
--- /dev/null
+++ b/test/io_test.dart
@@ -0,0 +1,101 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@TestOn('vm')
+
+import 'dart:io';
+
+import 'package:test/test.dart';
+
+import 'package:web_socket_channel/io.dart';
+import 'package:web_socket_channel/web_socket_channel.dart';
+
+void main() {
+  var server;
+  tearDown(() async {
+    if (server != null) await server.close();
+  });
+
+  test("communicates using existing WebSockets", () async {
+    server = await HttpServer.bind("localhost", 0);
+    server.transform(new WebSocketTransformer()).listen((webSocket) {
+      var channel = new IOWebSocketChannel(webSocket);
+      channel.sink.add("hello!");
+      channel.stream.listen((request) {
+        expect(request, equals("ping"));
+        channel.sink.add("pong");
+        channel.sink.close(5678, "raisin");
+      });
+    });
+
+    var webSocket = await WebSocket.connect("ws://localhost:${server.port}");
+    var channel = new IOWebSocketChannel(webSocket);
+
+    var n = 0;
+    channel.stream.listen((message) {
+      if (n == 0) {
+        expect(message, equals("hello!"));
+        channel.sink.add("ping");
+      } else if (n == 1) {
+        expect(message, equals("pong"));
+      } else {
+        fail("Only expected two messages.");
+      }
+      n++;
+    }, onDone: expectAsync(() {
+      expect(channel.closeCode, equals(5678));
+      expect(channel.closeReason, equals("raisin"));
+    }));
+  });
+
+  test(".connect communicates immediately", () async {
+    server = await HttpServer.bind("localhost", 0);
+    server.transform(new WebSocketTransformer()).listen((webSocket) {
+      var channel = new IOWebSocketChannel(webSocket);
+      channel.stream.listen((request) {
+        expect(request, equals("ping"));
+        channel.sink.add("pong");
+      });
+    });
+
+    var channel = new IOWebSocketChannel.connect(
+        "ws://localhost:${server.port}");
+    channel.sink.add("ping");
+
+    channel.stream.listen(expectAsync((message) {
+      expect(message, equals("pong"));
+      channel.sink.close(5678, "raisin");
+    }, count: 1), onDone: expectAsync(() {}));
+  });
+
+  test(".connect with an immediate call to close", () async {
+    server = await HttpServer.bind("localhost", 0);
+    server.transform(new WebSocketTransformer()).listen((webSocket) {
+      expect(() async {
+        var channel = new IOWebSocketChannel(webSocket);
+        await channel.stream.listen(null).asFuture();
+        expect(channel.closeCode, equals(5678));
+        expect(channel.closeReason, equals("raisin"));
+      }(), completes);
+    });
+
+    var channel = new IOWebSocketChannel.connect(
+        "ws://localhost:${server.port}");
+    channel.sink.close(5678, "raisin");
+  });
+
+  test(".connect wraps a connection error in WebSocketChannelException",
+      () async {
+    server = await HttpServer.bind("localhost", 0);
+    server.listen((request) {
+      request.response.statusCode = 404;
+      request.response.close();
+    });
+
+    var channel = new IOWebSocketChannel.connect(
+        "ws://localhost:${server.port}");
+    expect(channel.stream.toList(),
+        throwsA(new isInstanceOf<WebSocketChannelException>()));
+  });
+}