Add a WebSocketChannel class.

This deprecates CompatibleWebSocket in favor of the new class.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1646583003 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9b54230..cda67d2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 2.1.0
+
+* Added `WebSocketChannel`, an implementation of `StreamChannel` that's backed
+  by a `WebSocket`.
+
+* Deprecated `CompatibleWebSocket` in favor of `WebSocketChannel`.
+
 ## 2.0.0
 
 * Removed the `DataUri` class. It's redundant with the `Uri.data` getter that's
diff --git a/README.md b/README.md
index 1407c16..1bd5a90 100644
--- a/README.md
+++ b/README.md
@@ -10,9 +10,9 @@
   and `Content-Type` headers. This class supports both parsing and formatting
   media types according to [HTTP/1.1][2616].
 
-* A `CompatibleWebSocket` class that supports both the client and server sides
-  of the [WebSocket protocol][6455] independently of any specific server
-  implementation.
+* A `WebSocketChannel` class that provides a `StreamChannel` interface for both
+  the client and server sides of the [WebSocket protocol][6455] independently of
+  any specific server implementation.
 
 [2616]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html
 [6455]: https://tools.ietf.org/html/rfc6455
diff --git a/lib/src/web_socket.dart b/lib/src/web_socket.dart
index a11840c..edc0a18 100644
--- a/lib/src/web_socket.dart
+++ b/lib/src/web_socket.dart
@@ -2,111 +2,6 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
-import 'dart:async';
-
-import 'package:crypto/crypto.dart';
-
-import 'copy/web_socket_impl.dart';
-
-/// An implementation of the WebSocket protocol that's not specific to "dart:io"
-/// or to any particular HTTP API.
-///
-/// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket
-/// handshake][]. This needs to be handled manually by the user of the code.
-/// Once that's been done, [new CompatibleWebSocket] can be called with the
-/// underlying socket and it will handle the remainder of the protocol.
-///
-/// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
-abstract class CompatibleWebSocket implements Stream, StreamSink {
-  /// The interval for sending ping signals.
-  ///
-  /// If a ping message is not answered by a pong message from the peer, the
-  /// `WebSocket` is assumed disconnected and the connection is closed with a
-  /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the
-  /// pong message must be received within [pingInterval].
-  ///
-  /// There are never two outstanding pings at any given time, and the next ping
-  /// timer starts when the pong is received.
-  ///
-  /// By default, the [pingInterval] is `null`, indicating that ping messages
-  /// are disabled.
-  Duration pingInterval;
-
-  /// The [close code][] set when the WebSocket connection is closed.
-  ///
-  /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
-  ///
-  /// Before the connection has been closed, this will be `null`.
-  int get closeCode;
-
-  /// The [close reason][] set when the WebSocket connection is closed.
-  ///
-  /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
-  ///
-  /// Before the connection has been closed, this will be `null`.
-  String get closeReason;
-
-  /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
-  /// the [initial handshake].
-  ///
-  /// The return value should be sent back to the client in a
-  /// `Sec-WebSocket-Accept` header.
-  ///
-  /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
-  static String signKey(String key) {
-    var hash = new SHA1();
-    // We use [codeUnits] here rather than UTF-8-decoding the string because
-    // [key] is expected to be base64 encoded, and so will be pure ASCII.
-    hash.add((key + webSocketGUID).codeUnits);
-    return CryptoUtils.bytesToBase64(hash.close());
-  }
-
-  /// Creates a new WebSocket handling messaging across an existing socket.
-  ///
-  /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][]
-  /// must have already been completed on the socket before this is called.
-  ///
-  /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
-  /// `Socket`), it will be used for both sending and receiving data. Otherwise,
-  /// it will be used for receiving data and [sink] will be used for sending it.
-  ///
-  /// [protocol] should be the protocol negotiated by this handshake, if any.
-  ///
-  /// If this is a WebSocket server, [serverSide] should be `true` (the
-  /// default); if it's a client, [serverSide] should be `false`.
-  ///
-  /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
-  factory CompatibleWebSocket(Stream<List<int>> stream,
-        {StreamSink<List<int>> sink, String protocol, bool serverSide: true}) {
-    if (sink == null) {
-      if (stream is! StreamSink) {
-        throw new ArgumentError("If stream isn't also a StreamSink, sink must "
-            "be passed explicitly.");
-      }
-      sink = stream as StreamSink;
-    }
-
-    return new WebSocketImpl.fromSocket(stream, sink, protocol, serverSide);
-  }
-
-  /// Closes the web socket connection.
-  ///
-  /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
-  /// to the remote peer, respectively. If they are omitted, the peer will see
-  /// a "no status received" code with no reason.
-  ///
-  /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
-  /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
-  Future close([int closeCode, String closeReason]);
-}
-
-/// An exception thrown by [CompatibleWebSocket].
-class CompatibleWebSocketException implements Exception {
-  final String message;
-
-  CompatibleWebSocketException([this.message]);
-
-  String toString() => message == null
-      ? "CompatibleWebSocketException" :
-        "CompatibleWebSocketException: $message";
-}
+export 'web_socket/channel.dart';
+export 'web_socket/deprecated.dart';
+export 'web_socket/exception.dart';
diff --git a/lib/src/web_socket/channel.dart b/lib/src/web_socket/channel.dart
new file mode 100644
index 0000000..339f065
--- /dev/null
+++ b/lib/src/web_socket/channel.dart
@@ -0,0 +1,123 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:crypto/crypto.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import '../copy/web_socket_impl.dart';
+
+/// A [StreamChannel] implementation of the WebSocket protocol.
+///
+/// This is not specific to `dart:io` or to any particular HTTP API. Because of
+/// that, it doesn't handle the initial [WebSocket handshake][]. This needs to
+/// be handled manually by the user of the code. Once that's been done, [new
+/// WebSocketChannel] can be called with the underlying socket and it will
+/// handle the remainder of the protocol.
+///
+/// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
+class WebSocketChannel extends StreamChannelMixin {
+  /// The underlying web socket.
+  ///
+  /// This is essentially a copy of `dart:io`'s WebSocket implementation, with
+  /// the IO-specific pieces factored out.
+  final WebSocketImpl _webSocket;
+
+  /// The interval for sending ping signals.
+  ///
+  /// If a ping message is not answered by a pong message from the peer, the
+  /// `WebSocket` is assumed disconnected and the connection is closed with a
+  /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the
+  /// pong message must be received within [pingInterval].
+  ///
+  /// There are never two outstanding pings at any given time, and the next ping
+  /// timer starts when the pong is received.
+  ///
+  /// By default, the [pingInterval] is `null`, indicating that ping messages
+  /// are disabled.
+  Duration get pingInterval => _webSocket.pingInterval;
+  set pingInterval(Duration value) => _webSocket.pingInterval = value;
+
+  /// The [close code][] set when the WebSocket connection is closed.
+  ///
+  /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
+  ///
+  /// Before the connection has been closed, this will be `null`.
+  int get closeCode => _webSocket.closeCode;
+
+  /// The [close reason][] set when the WebSocket connection is closed.
+  ///
+  /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
+  ///
+  /// Before the connection has been closed, this will be `null`.
+  String get closeReason => _webSocket.closeReason;
+
+  Stream get stream => new StreamView(_webSocket);
+
+  /// The sink for sending values to the other endpoint.
+  ///
+  /// This has additional arguments to [WebSocketSink.close] arguments that
+  /// provide the remote endpoint reasons for closing the connection.
+  WebSocketSink get sink => new WebSocketSink._(_webSocket);
+
+  /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
+  /// the [initial handshake].
+  ///
+  /// The return value should be sent back to the client in a
+  /// `Sec-WebSocket-Accept` header.
+  ///
+  /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
+  static String signKey(String key) {
+    var hash = new SHA1();
+    // We use [codeUnits] here rather than UTF-8-decoding the string because
+    // [key] is expected to be base64 encoded, and so will be pure ASCII.
+    hash.add((key + webSocketGUID).codeUnits);
+    return CryptoUtils.bytesToBase64(hash.close());
+  }
+
+  /// Creates a new WebSocket handling messaging across an existing socket.
+  ///
+  /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][]
+  /// must have already been completed on the socket before this is called.
+  ///
+  /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
+  /// `Socket`), it will be used for both sending and receiving data. Otherwise,
+  /// it will be used for receiving data and [sink] will be used for sending it.
+  ///
+  /// [protocol] should be the protocol negotiated by this handshake, if any.
+  ///
+  /// If this is a WebSocket server, [serverSide] should be `true` (the
+  /// default); if it's a client, [serverSide] should be `false`.
+  ///
+  /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
+  WebSocketChannel(StreamChannel<List<int>> channel,
+        {String protocol, bool serverSide: true})
+      : _webSocket = new WebSocketImpl.fromSocket(
+          channel.stream, channel.sink, protocol, serverSide);
+}
+
+/// The sink exposed by a [CompatibleWebSocket].
+///
+/// This is like a normal [StreamSink], except that it supports extra arguments
+/// to [close].
+class WebSocketSink extends DelegatingStreamSink {
+  final WebSocketImpl _webSocket;
+
+  WebSocketSink._(WebSocketImpl webSocket)
+      : super(webSocket),
+        _webSocket = webSocket;
+
+  /// Closes the web socket connection.
+  ///
+  /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
+  /// to the remote peer, respectively. If they are omitted, the peer will see
+  /// a "no status received" code with no reason.
+  ///
+  /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
+  /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
+  Future close([int closeCode, String closeReason]) =>
+      _webSocket.close(closeCode, closeReason);
+}
diff --git a/lib/src/web_socket/deprecated.dart b/lib/src/web_socket/deprecated.dart
new file mode 100644
index 0000000..c2162d7
--- /dev/null
+++ b/lib/src/web_socket/deprecated.dart
@@ -0,0 +1,95 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:crypto/crypto.dart';
+
+import '../copy/web_socket_impl.dart';
+import 'channel.dart';
+
+/// Use [WebSocketChannel] instead.
+@Deprecated("Will be removed in 3.0.0.")
+abstract class CompatibleWebSocket implements Stream, StreamSink {
+  /// The interval for sending ping signals.
+  ///
+  /// If a ping message is not answered by a pong message from the peer, the
+  /// `WebSocket` is assumed disconnected and the connection is closed with a
+  /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the
+  /// pong message must be received within [pingInterval].
+  ///
+  /// There are never two outstanding pings at any given time, and the next ping
+  /// timer starts when the pong is received.
+  ///
+  /// By default, the [pingInterval] is `null`, indicating that ping messages
+  /// are disabled.
+  Duration pingInterval;
+
+  /// The [close code][] set when the WebSocket connection is closed.
+  ///
+  /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
+  ///
+  /// Before the connection has been closed, this will be `null`.
+  int get closeCode;
+
+  /// The [close reason][] set when the WebSocket connection is closed.
+  ///
+  /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
+  ///
+  /// Before the connection has been closed, this will be `null`.
+  String get closeReason;
+
+  /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
+  /// the [initial handshake].
+  ///
+  /// The return value should be sent back to the client in a
+  /// `Sec-WebSocket-Accept` header.
+  ///
+  /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
+  static String signKey(String key) {
+    var hash = new SHA1();
+    // We use [codeUnits] here rather than UTF-8-decoding the string because
+    // [key] is expected to be base64 encoded, and so will be pure ASCII.
+    hash.add((key + webSocketGUID).codeUnits);
+    return CryptoUtils.bytesToBase64(hash.close());
+  }
+
+  /// Creates a new WebSocket handling messaging across an existing socket.
+  ///
+  /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][]
+  /// must have already been completed on the socket before this is called.
+  ///
+  /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
+  /// `Socket`), it will be used for both sending and receiving data. Otherwise,
+  /// it will be used for receiving data and [sink] will be used for sending it.
+  ///
+  /// [protocol] should be the protocol negotiated by this handshake, if any.
+  ///
+  /// If this is a WebSocket server, [serverSide] should be `true` (the
+  /// default); if it's a client, [serverSide] should be `false`.
+  ///
+  /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
+  factory CompatibleWebSocket(Stream<List<int>> stream,
+        {StreamSink<List<int>> sink, String protocol, bool serverSide: true}) {
+    if (sink == null) {
+      if (stream is! StreamSink) {
+        throw new ArgumentError("If stream isn't also a StreamSink, sink must "
+            "be passed explicitly.");
+      }
+      sink = stream as StreamSink;
+    }
+
+    return new WebSocketImpl.fromSocket(stream, sink, protocol, serverSide);
+  }
+
+  /// Closes the web socket connection.
+  ///
+  /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
+  /// to the remote peer, respectively. If they are omitted, the peer will see
+  /// a "no status received" code with no reason.
+  ///
+  /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
+  /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
+  Future close([int closeCode, String closeReason]);
+}
diff --git a/lib/src/web_socket/exception.dart b/lib/src/web_socket/exception.dart
new file mode 100644
index 0000000..87f61d8
--- /dev/null
+++ b/lib/src/web_socket/exception.dart
@@ -0,0 +1,16 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'channel.dart';
+
+/// An exception thrown by [WebSocketChannel].
+class CompatibleWebSocketException implements Exception {
+  final String message;
+
+  CompatibleWebSocketException([this.message]);
+
+  String toString() => message == null
+      ? "CompatibleWebSocketException" :
+        "CompatibleWebSocketException: $message";
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 930db8f..15ad358 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,15 +1,21 @@
 name: http_parser
-version: 2.0.0
+version: 2.1.0-dev
 author: "Dart Team <misc@dartlang.org>"
 homepage: https://github.com/dart-lang/http_parser
 description: >
   A platform-independent package for parsing and serializing HTTP formats.
 dependencies:
+  async: "^1.3.0"
   collection: ">=0.9.1 <2.0.0"
   crypto: "^0.9.0"
   source_span: "^1.0.0"
+  stream_channel: "^1.0.0"
   string_scanner: ">=0.0.0 <0.2.0"
 dev_dependencies:
   test: "^0.12.0"
 environment:
   sdk: ">=1.8.0 <2.0.0"
+
+dependency_overrides:
+  stream_channel:
+    git: git://github.com/dart-lang/stream_channel.git
diff --git a/test/web_socket_test.dart b/test/web_socket_test.dart
index a8278b6..53aa5be 100644
--- a/test/web_socket_test.dart
+++ b/test/web_socket_test.dart
@@ -7,11 +7,13 @@
 import 'dart:io';
 
 import 'package:http_parser/http_parser.dart';
+import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
 
 void main() {
-  test("a client can communicate with a WebSocket server", () {
-    return HttpServer.bind("localhost", 0).then((server) {
+  group("using WebSocketChannel", () {
+    test("a client can communicate with a WebSocket server", () async {
+      var server = await HttpServer.bind("localhost", 0);
       server.transform(new WebSocketTransformer()).listen((webSocket) {
         webSocket.add("hello!");
         webSocket.listen((request) {
@@ -22,39 +24,38 @@
       });
 
       var client = new HttpClient();
-      return client
-          .openUrl("GET", Uri.parse("http://localhost:${server.port}"))
-          .then((request) {
-        request.headers
-          ..set("Connection", "Upgrade")
-          ..set("Upgrade", "websocket")
-          ..set("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw==")
-          ..set("Sec-WebSocket-Version", "13");
-        return request.close();
-      }).then((response) => response.detachSocket()).then((socket) {
-        var webSocket = new CompatibleWebSocket(socket, serverSide: false);
+      var request = await client.openUrl(
+          "GET", Uri.parse("http://localhost:${server.port}"));
+      request.headers
+        ..set("Connection", "Upgrade")
+        ..set("Upgrade", "websocket")
+        ..set("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw==")
+        ..set("Sec-WebSocket-Version", "13");
 
-        var n = 0;
-        return webSocket.listen((message) {
-          if (n == 0) {
-            expect(message, equals("hello!"));
-            webSocket.add("ping");
-          } else if (n == 1) {
-            expect(message, equals("pong"));
-            webSocket.close();
-            server.close();
-          } else {
-            fail("Only expected two messages.");
-          }
-          n++;
-        }).asFuture();
-      });
+      var response = await request.close();
+      var socket = await response.detachSocket();
+      var innerChannel = new StreamChannel(socket, socket);
+      var webSocket = new WebSocketChannel(innerChannel, serverSide: false);
+
+      var n = 0;
+      await webSocket.stream.listen((message) {
+        if (n == 0) {
+          expect(message, equals("hello!"));
+          webSocket.sink.add("ping");
+        } else if (n == 1) {
+          expect(message, equals("pong"));
+          webSocket.sink.close();
+          server.close();
+        } else {
+          fail("Only expected two messages.");
+        }
+        n++;
+      }).asFuture();
     });
-  });
 
-  test("a server can communicate with a WebSocket client", () {
-    return HttpServer.bind("localhost", 0).then((server) {
-      server.listen((request) {
+    test("a server can communicate with a WebSocket client", () async {
+      var server = await HttpServer.bind("localhost", 0);
+      server.listen((request) async {
         var response = request.response;
         response.statusCode = 101;
         response.headers
@@ -63,34 +64,119 @@
           ..set("Sec-WebSocket-Accept", CompatibleWebSocket
               .signKey(request.headers.value('Sec-WebSocket-Key')));
         response.contentLength = 0;
-        response.detachSocket().then((socket) {
-          var webSocket = new CompatibleWebSocket(socket);
+
+        var socket = await response.detachSocket();
+        var innerChannel = new StreamChannel(socket, socket);
+        var webSocket = new WebSocketChannel(innerChannel);
+        webSocket.sink.add("hello!");
+
+        var message = await webSocket.stream.first;
+        expect(message, equals("ping"));
+        webSocket.sink.add("pong");
+        webSocket.sink.close();
+      });
+
+      var webSocket = await WebSocket.connect('ws://localhost:${server.port}');
+      var n = 0;
+      await webSocket.listen((message) {
+        if (n == 0) {
+          expect(message, equals("hello!"));
+          webSocket.add("ping");
+        } else if (n == 1) {
+          expect(message, equals("pong"));
+          webSocket.close();
+          server.close();
+        } else {
+          fail("Only expected two messages.");
+        }
+        n++;
+      }).asFuture();
+    });
+  });
+
+  group("using CompatibleWebSocket", () {
+    test("a client can communicate with a WebSocket server", () {
+      return HttpServer.bind("localhost", 0).then((server) {
+        server.transform(new WebSocketTransformer()).listen((webSocket) {
           webSocket.add("hello!");
-          webSocket.first.then((request) {
+          webSocket.listen((request) {
             expect(request, equals("ping"));
             webSocket.add("pong");
             webSocket.close();
           });
         });
-      });
 
-      return WebSocket
-          .connect('ws://localhost:${server.port}')
-          .then((webSocket) {
-        var n = 0;
-        return webSocket.listen((message) {
-          if (n == 0) {
-            expect(message, equals("hello!"));
-            webSocket.add("ping");
-          } else if (n == 1) {
-            expect(message, equals("pong"));
-            webSocket.close();
-            server.close();
-          } else {
-            fail("Only expected two messages.");
-          }
-          n++;
-        }).asFuture();
+        var client = new HttpClient();
+        return client
+            .openUrl("GET", Uri.parse("http://localhost:${server.port}"))
+            .then((request) {
+          request.headers
+            ..set("Connection", "Upgrade")
+            ..set("Upgrade", "websocket")
+            ..set("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw==")
+            ..set("Sec-WebSocket-Version", "13");
+          return request.close();
+        }).then((response) => response.detachSocket()).then((socket) {
+          var webSocket = new CompatibleWebSocket(socket, serverSide: false);
+
+          var n = 0;
+          return webSocket.listen((message) {
+            if (n == 0) {
+              expect(message, equals("hello!"));
+              webSocket.add("ping");
+            } else if (n == 1) {
+              expect(message, equals("pong"));
+              webSocket.close();
+              server.close();
+            } else {
+              fail("Only expected two messages.");
+            }
+            n++;
+          }).asFuture();
+        });
+      });
+    });
+
+    test("a server can communicate with a WebSocket client", () {
+      return HttpServer.bind("localhost", 0).then((server) {
+        server.listen((request) {
+          var response = request.response;
+          response.statusCode = 101;
+          response.headers
+            ..set("Connection", "Upgrade")
+            ..set("Upgrade", "websocket")
+            ..set("Sec-WebSocket-Accept", CompatibleWebSocket
+                .signKey(request.headers.value('Sec-WebSocket-Key')));
+          response.contentLength = 0;
+          response.detachSocket().then((socket) {
+            var webSocket = new CompatibleWebSocket(socket);
+            webSocket.add("hello!");
+            webSocket.first.then((request) {
+              expect(request, equals("ping"));
+              webSocket.add("pong");
+              webSocket.close();
+            });
+          });
+        });
+
+        return WebSocket
+            .connect('ws://localhost:${server.port}')
+            .then((webSocket) {
+          var n = 0;
+          return webSocket.listen((message) {
+            if (n == 0) {
+              expect(message, equals("hello!"));
+              webSocket.add("ping");
+            } else if (n == 1) {
+              expect(message, equals("pong"));
+              webSocket.close();
+              server.close();
+            } else {
+              fail("Only expected two messages.");
+            }
+            n++;
+          }).asFuture();
+        });
       });
     });
   });