Support transparent reconnects on the server (#19)

@grouma this is an attempt to fix #18 (may be easier to [view the diff ignoring whitespace](https://github.com/dart-lang/sse/pull/19/files?utf8=%E2%9C%93&diff=unified&w=1) since some code got indenting and makes the diff look much bigger than it is).

However there is an exposed method here - `closeSink` that closes the underlying sink (in order to test - we can't close the exposed `sink` because it closes the stream controller that needs to continue to be used). I'm not sure if there's a better way (we can add `@visibleForTesting`, though `meta` isn't currently referenced here).

Happy to make changes if this isn't what you had in mind (and I can test it end-to-end through dwds and GitPod to confirm it works prior to merging it).
diff --git a/.travis.yml b/.travis.yml
index e0371f6..d6bce8f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,7 +5,7 @@
 
 dart:
   - dev
-  - 2.1.0
+  - 2.2.0
 
 with_content_shell: false
 
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f46643e..022f120 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 3.1.0
+
+- Add optional `keepAlive` parameter to the `SseHandler`. If `keepAlive` is
+  supplied, the connection will remain active for this period after a
+  disconnect and can be reconnected transparently. If there is no reconnect
+  within that period, the connection will be closed normally.
+
 ## 3.0.0
 
 - Add retry logic.
diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart
index 1a5ef06..bfed935 100644
--- a/lib/server/sse_handler.dart
+++ b/lib/server/sse_handler.dart
@@ -2,151 +2,4 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
-import 'dart:async';
-import 'dart:convert';
-
-import 'package:async/async.dart';
-import 'package:logging/logging.dart';
-import 'package:pedantic/pedantic.dart';
-import 'package:shelf/shelf.dart' as shelf;
-import 'package:stream_channel/stream_channel.dart';
-
-// RFC 2616 requires carriage return delimiters.
-String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
-    'Content-Type: text/event-stream\r\n'
-    'Cache-Control: no-cache\r\n'
-    'Connection: keep-alive\r\n'
-    'Access-Control-Allow-Credentials: true\r\n'
-    'Access-Control-Allow-Origin: $origin\r\n'
-    '\r\n\r\n';
-
-/// A bi-directional SSE connection between server and browser.
-class SseConnection extends StreamChannelMixin<String> {
-  /// Incoming messages from the Browser client.
-  final _incomingController = StreamController<String>();
-
-  /// Outgoing messages to the Browser client.
-  final _outgoingController = StreamController<String>();
-
-  final Sink _sink;
-
-  final _closedCompleter = Completer<void>();
-
-  SseConnection(this._sink) {
-    _outgoingController.stream.listen((data) {
-      if (!_closedCompleter.isCompleted) {
-        // JSON encode the message to escape new lines.
-        _sink.add('data: ${json.encode(data)}\n');
-        _sink.add('\n');
-      }
-    });
-    _outgoingController.onCancel = _close;
-    _incomingController.onCancel = _close;
-  }
-
-  /// The message added to the sink has to be JSON encodable.
-  @override
-  StreamSink<String> get sink => _outgoingController.sink;
-
-  // Add messages to this [StreamSink] to send them to the server.
-  /// [Stream] of messages sent from the server to this client.
-  ///
-  /// A message is a decoded JSON object.
-  @override
-  Stream<String> get stream => _incomingController.stream;
-
-  void _close() {
-    if (!_closedCompleter.isCompleted) {
-      _closedCompleter.complete();
-      _sink.close();
-      if (!_outgoingController.isClosed) _outgoingController.close();
-      if (!_incomingController.isClosed) _incomingController.close();
-    }
-  }
-}
-
-/// [SseHandler] handles requests on a user defined path to create
-/// two-way communications of JSON encodable data between server and clients.
-///
-/// A server sends messages to a client through an SSE channel, while
-/// a client sends message to a server through HTTP POST requests.
-class SseHandler {
-  final _logger = Logger('SseHandler');
-  final Uri _uri;
-  final _connections = <String, SseConnection>{};
-  final _connectionController = StreamController<SseConnection>();
-
-  StreamQueue<SseConnection> _connectionsStream;
-
-  SseHandler(this._uri);
-
-  StreamQueue<SseConnection> get connections =>
-      _connectionsStream ??= StreamQueue(_connectionController.stream);
-
-  shelf.Handler get handler => _handle;
-
-  int get numberOfClients => _connections.length;
-
-  shelf.Response _createSseConnection(shelf.Request req, String path) {
-    req.hijack((channel) async {
-      var sink = utf8.encoder.startChunkedConversion(channel.sink);
-      sink.add(_sseHeaders(req.headers['origin']));
-      var clientId = req.url.queryParameters['sseClientId'];
-      var connection = SseConnection(sink);
-      _connections[clientId] = connection;
-      unawaited(connection._closedCompleter.future.then((_) {
-        _connections.remove(clientId);
-      }));
-      // Remove connection when it is remotely closed or the stream is
-      // cancelled.
-      channel.stream.listen((_) {
-        // SSE is unidirectional. Responses are handled through POST requests.
-      }, onDone: () {
-        connection._close();
-      });
-
-      _connectionController.add(connection);
-    });
-    return shelf.Response.notFound('');
-  }
-
-  String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
-
-  Future<shelf.Response> _handle(shelf.Request req) async {
-    var path = _getOriginalPath(req);
-    if (_uri.path != path) {
-      return shelf.Response.notFound('');
-    }
-
-    if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
-      return _createSseConnection(req, path);
-    }
-
-    if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
-      return _handleIncomingMessage(req, path);
-    }
-
-    return shelf.Response.notFound('');
-  }
-
-  Future<shelf.Response> _handleIncomingMessage(
-      shelf.Request req, String path) async {
-    try {
-      var clientId = req.url.queryParameters['sseClientId'];
-      var message = await req.readAsString();
-      var jsonObject = json.decode(message) as String;
-      _connections[clientId]?._incomingController?.add(jsonObject);
-    } catch (e, st) {
-      _logger.fine('Failed to handle incoming message. $e $st');
-    }
-    return shelf.Response.ok('', headers: {
-      'access-control-allow-credentials': 'true',
-      'access-control-allow-origin': _originFor(req),
-    });
-  }
-
-  String _originFor(shelf.Request req) =>
-      // Firefox does not set header "origin".
-      // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
-      req.headers['origin'] ?? req.headers['host'];
-}
+export 'package:sse/src/server/sse_handler.dart' show SseConnection, SseHandler;
diff --git a/lib/src/server/sse_handler.dart b/lib/src/server/sse_handler.dart
new file mode 100644
index 0000000..e8cd523
--- /dev/null
+++ b/lib/src/server/sse_handler.dart
@@ -0,0 +1,221 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+import 'package:logging/logging.dart';
+import 'package:pedantic/pedantic.dart';
+import 'package:shelf/shelf.dart' as shelf;
+import 'package:stream_channel/stream_channel.dart';
+
+// RFC 2616 requires carriage return delimiters.
+String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
+    'Content-Type: text/event-stream\r\n'
+    'Cache-Control: no-cache\r\n'
+    'Connection: keep-alive\r\n'
+    'Access-Control-Allow-Credentials: true\r\n'
+    'Access-Control-Allow-Origin: $origin\r\n'
+    '\r\n\r\n';
+
+/// A bi-directional SSE connection between server and browser.
+class SseConnection extends StreamChannelMixin<String> {
+  /// Incoming messages from the Browser client.
+  final _incomingController = StreamController<String>();
+
+  /// Outgoing messages to the Browser client.
+  final _outgoingController = StreamController<String>();
+
+  Sink _sink;
+
+  /// How long to wait after a connection drops before considering it closed.
+  final Duration _keepAlive;
+
+  /// A timer counting down the KeepAlive period (null if hasn't disconnected).
+  Timer _keepAliveTimer;
+
+  /// Whether this connection is currently in the KeepAlive timeout period.
+  bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false;
+
+  final _closedCompleter = Completer<void>();
+
+  /// Creates an [SseConnection] for the supplied [_sink].
+  ///
+  /// If [keepAlive] is supplied, the connection will remain active for this
+  /// period after a disconnect and can be reconnected transparently. If there
+  /// is no reconnect within that period, the connection will be closed normally.
+  ///
+  /// If [keepAlive] is not supplied, the connection will be closed immediately
+  /// after a disconnect.
+  SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
+    unawaited(_setUpListener());
+    _outgoingController.onCancel = _close;
+    _incomingController.onCancel = _close;
+  }
+
+  Future<void> _setUpListener() async {
+    var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
+    while (await outgoingStreamQueue.hasNext) {
+      // If we're in a KeepAlive timeout, there's nowhere to send messages so
+      // wait a short period and check again.
+      if (isInKeepAlivePeriod) {
+        await Future.delayed(const Duration(milliseconds: 200));
+        continue;
+      }
+
+      // Peek the data so we don't remove it from the stream if we're unable to
+      // send it.
+      final data = await outgoingStreamQueue.peek;
+      try {
+        // JSON encode the message to escape new lines.
+        _sink.add('data: ${json.encode(data)}\n');
+        _sink.add('\n');
+        await outgoingStreamQueue.next; // Consume from stream if no errors.
+      } catch (StateError) {
+        if (_keepAlive == null || _closedCompleter.isCompleted) {
+          rethrow;
+        }
+        // If we got here then the sink may have closed but the stream.onDone
+        // hasn't fired yet, so pause the subscription and skip calling
+        // `next` so the message remains in the queue to try again.
+        _handleDisconnect();
+      }
+    }
+  }
+
+  /// The message added to the sink has to be JSON encodable.
+  @override
+  StreamSink<String> get sink => _outgoingController.sink;
+
+  // Add messages to this [StreamSink] to send them to the server.
+  /// [Stream] of messages sent from the server to this client.
+  ///
+  /// A message is a decoded JSON object.
+  @override
+  Stream<String> get stream => _incomingController.stream;
+
+  void _acceptReconnection(Sink sink) {
+    _keepAliveTimer?.cancel();
+    _sink = sink;
+  }
+
+  void _handleDisconnect() {
+    if (_keepAlive == null) {
+      // Close immediately if we're not keeping alive.
+      _close();
+    } else if (!isInKeepAlivePeriod) {
+      // Otherwise if we didn't already have an active timer, set a timer to
+      // close after the timeout period. If the connection comes back, this will
+      // be cancelled and all messages left in the queue tried again.
+      _keepAliveTimer = Timer(_keepAlive, _close);
+    }
+  }
+
+  void _close() {
+    if (!_closedCompleter.isCompleted) {
+      _closedCompleter.complete();
+      _sink.close();
+      if (!_outgoingController.isClosed) _outgoingController.close();
+      if (!_incomingController.isClosed) _incomingController.close();
+    }
+  }
+}
+
+/// [SseHandler] handles requests on a user defined path to create
+/// two-way communications of JSON encodable data between server and clients.
+///
+/// A server sends messages to a client through an SSE channel, while
+/// a client sends message to a server through HTTP POST requests.
+class SseHandler {
+  final _logger = Logger('SseHandler');
+  final Uri _uri;
+  final Duration _keepAlive;
+  final _connections = <String, SseConnection>{};
+  final _connectionController = StreamController<SseConnection>();
+
+  StreamQueue<SseConnection> _connectionsStream;
+
+  SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive;
+
+  StreamQueue<SseConnection> get connections =>
+      _connectionsStream ??= StreamQueue(_connectionController.stream);
+
+  shelf.Handler get handler => _handle;
+
+  int get numberOfClients => _connections.length;
+
+  shelf.Response _createSseConnection(shelf.Request req, String path) {
+    req.hijack((channel) async {
+      var sink = utf8.encoder.startChunkedConversion(channel.sink);
+      sink.add(_sseHeaders(req.headers['origin']));
+      var clientId = req.url.queryParameters['sseClientId'];
+
+      // Check if we already have a connection for this ID that is in the process
+      // of timing out (in which case we can reconnect it transparently).
+      if (_connections[clientId] != null &&
+          _connections[clientId].isInKeepAlivePeriod) {
+        _connections[clientId]._acceptReconnection(sink);
+      } else {
+        var connection = SseConnection(sink, keepAlive: _keepAlive);
+        _connections[clientId] = connection;
+        unawaited(connection._closedCompleter.future.then((_) {
+          _connections.remove(clientId);
+        }));
+        // Remove connection when it is remotely closed or the stream is
+        // cancelled.
+        channel.stream.listen((_) {
+          // SSE is unidirectional. Responses are handled through POST requests.
+        }, onDone: () {
+          connection._handleDisconnect();
+        });
+
+        _connectionController.add(connection);
+      }
+    });
+    return shelf.Response.notFound('');
+  }
+
+  String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
+
+  Future<shelf.Response> _handle(shelf.Request req) async {
+    var path = _getOriginalPath(req);
+    if (_uri.path != path) {
+      return shelf.Response.notFound('');
+    }
+
+    if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
+      return _createSseConnection(req, path);
+    }
+
+    if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
+      return _handleIncomingMessage(req, path);
+    }
+
+    return shelf.Response.notFound('');
+  }
+
+  Future<shelf.Response> _handleIncomingMessage(
+      shelf.Request req, String path) async {
+    try {
+      var clientId = req.url.queryParameters['sseClientId'];
+      var message = await req.readAsString();
+      var jsonObject = json.decode(message) as String;
+      _connections[clientId]?._incomingController?.add(jsonObject);
+    } catch (e, st) {
+      _logger.fine('Failed to handle incoming message. $e $st');
+    }
+    return shelf.Response.ok('', headers: {
+      'access-control-allow-credentials': 'true',
+      'access-control-allow-origin': _originFor(req),
+    });
+  }
+
+  String _originFor(shelf.Request req) =>
+      // Firefox does not set header "origin".
+      // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
+      req.headers['origin'] ?? req.headers['host'];
+}
+
+void closeSink(SseConnection connection) => connection._sink.close();
diff --git a/pubspec.yaml b/pubspec.yaml
index 33fc5fb..9c164f7 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: sse
-version: 3.0.0
+version: 3.1.0
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/sse
 description: >-
@@ -8,7 +8,7 @@
   requests.
 
 environment:
-  sdk: ">=2.1.0 <3.0.0"
+  sdk: ">=2.2.0 <3.0.0"
 
 dependencies:
   async: ^2.0.8
diff --git a/test/sse_test.dart b/test/sse_test.dart
index 62fe4c6..0d4040f 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -10,6 +10,7 @@
 import 'package:shelf/shelf_io.dart' as io;
 import 'package:shelf_static/shelf_static.dart';
 import 'package:sse/server/sse_handler.dart';
+import 'package:sse/src/server/sse_handler.dart' show closeSink;
 import 'package:test/test.dart';
 import 'package:webdriver/io.dart';
 
@@ -33,99 +34,180 @@
     chromeDriver.kill();
   });
 
-  setUp(() async {
-    handler = SseHandler(Uri.parse('/test'));
+  group('SSE', () {
+    setUp(() async {
+      handler = SseHandler(Uri.parse('/test'));
 
-    var cascade = shelf.Cascade().add(handler.handler).add(_faviconHandler).add(
-        createStaticHandler('test/web',
-            listDirectories: true, defaultDocument: 'index.html'));
+      var cascade = shelf.Cascade()
+          .add(handler.handler)
+          .add(_faviconHandler)
+          .add(createStaticHandler('test/web',
+              listDirectories: true, defaultDocument: 'index.html'));
 
-    server = await io.serve(cascade.handler, 'localhost', 0);
-    var capabilities = Capabilities.chrome
-      ..addAll({
-        Capabilities.chromeOptions: {
-          'args': ['--headless']
-        }
-      });
-    webdriver = await createDriver(desired: capabilities);
+      server = await io.serve(cascade.handler, 'localhost', 0);
+      var capabilities = Capabilities.chrome
+        ..addAll({
+          Capabilities.chromeOptions: {
+            'args': ['--headless']
+          }
+        });
+      webdriver = await createDriver(desired: capabilities);
+    });
+
+    tearDown(() async {
+      await webdriver.quit();
+      await server.close();
+    });
+
+    test('Can round trip messages', () async {
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      connection.sink.add('blah');
+      expect(await connection.stream.first, 'blah');
+    });
+
+    test('Multiple clients can connect', () async {
+      var connections = handler.connections;
+      await webdriver.get('http://localhost:${server.port}');
+      await connections.next;
+      await webdriver.get('http://localhost:${server.port}');
+      await connections.next;
+    });
+
+    test('Routes data correctly', () async {
+      var connections = handler.connections;
+      await webdriver.get('http://localhost:${server.port}');
+      var connectionA = await connections.next;
+      connectionA.sink.add('foo');
+      expect(await connectionA.stream.first, 'foo');
+
+      await webdriver.get('http://localhost:${server.port}');
+      var connectionB = await connections.next;
+      connectionB.sink.add('bar');
+      expect(await connectionB.stream.first, 'bar');
+    });
+
+    test('Can close from the server', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      expect(handler.numberOfClients, 1);
+      await connection.sink.close();
+      await pumpEventQueue();
+      expect(handler.numberOfClients, 0);
+    });
+
+    test('Client reconnects after being disconnected', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      expect(handler.numberOfClients, 1);
+      await connection.sink.close();
+      await pumpEventQueue();
+      expect(handler.numberOfClients, 0);
+
+      // Ensure the client reconnects
+      await handler.connections.next;
+    });
+
+    test('Can close from the client-side', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      expect(handler.numberOfClients, 1);
+
+      var closeButton = await webdriver.findElement(const By.tagName('button'));
+      await closeButton.click();
+
+      // Should complete since the connection is closed.
+      await connection.stream.toList();
+      expect(handler.numberOfClients, 0);
+    });
+
+    test('Cancelling the listener closes the connection', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      expect(handler.numberOfClients, 1);
+
+      var sub = connection.stream.listen((_) {});
+      await sub.cancel();
+      await pumpEventQueue();
+      expect(handler.numberOfClients, 0);
+    });
+
+    test('Disconnects when navigating away', () async {
+      await webdriver.get('http://localhost:${server.port}');
+      expect(handler.numberOfClients, 1);
+
+      await webdriver.get('chrome://version/');
+      expect(handler.numberOfClients, 0);
+    });
   });
 
-  tearDown(() async {
-    await webdriver.quit();
-    await server.close();
-  });
+  group('SSE with server keep-alive', () {
+    setUp(() async {
+      handler =
+          SseHandler(Uri.parse('/test'), keepAlive: const Duration(seconds: 5));
 
-  test('Can round trip messages', () async {
-    await webdriver.get('http://localhost:${server.port}');
-    var connection = await handler.connections.next;
-    connection.sink.add('blah');
-    expect(await connection.stream.first, 'blah');
-  });
+      var cascade = shelf.Cascade()
+          .add(handler.handler)
+          .add(_faviconHandler)
+          .add(createStaticHandler('test/web',
+              listDirectories: true, defaultDocument: 'index.html'));
 
-  test('Multiple clients can connect', () async {
-    var connections = handler.connections;
-    await webdriver.get('http://localhost:${server.port}');
-    await connections.next;
-    await webdriver.get('http://localhost:${server.port}');
-    await connections.next;
-  });
+      server = await io.serve(cascade.handler, 'localhost', 0);
+      var capabilities = Capabilities.chrome
+        ..addAll({
+          Capabilities.chromeOptions: {
+            'args': ['--headless']
+          }
+        });
+      webdriver = await createDriver(desired: capabilities);
+    });
 
-  test('Routes data correctly', () async {
-    var connections = handler.connections;
-    await webdriver.get('http://localhost:${server.port}');
-    var connectionA = await connections.next;
-    connectionA.sink.add('foo');
-    expect(await connectionA.stream.first, 'foo');
+    tearDown(() async {
+      await webdriver.quit();
+      await server.close();
+    });
 
-    await webdriver.get('http://localhost:${server.port}');
-    var connectionB = await connections.next;
-    connectionB.sink.add('bar');
-    expect(await connectionB.stream.first, 'bar');
-  });
+    test('Client reconnect use the same connection', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      expect(handler.numberOfClients, 1);
 
-  test('Can close from the server', () async {
-    expect(handler.numberOfClients, 0);
-    await webdriver.get('http://localhost:${server.port}');
-    var connection = await handler.connections.next;
-    expect(handler.numberOfClients, 1);
-    await connection.sink.close();
-    await pumpEventQueue();
-    expect(handler.numberOfClients, 0);
-  });
+      // Close the underlying connection.
+      closeSink(connection);
+      await pumpEventQueue();
 
-  test('Can close from the client-side', () async {
-    expect(handler.numberOfClients, 0);
-    await webdriver.get('http://localhost:${server.port}');
-    var connection = await handler.connections.next;
-    expect(handler.numberOfClients, 1);
+      // Ensure there's still a connection.
+      expect(handler.numberOfClients, 1);
 
-    var closeButton = await webdriver.findElement(const By.tagName('button'));
-    await closeButton.click();
+      // Ensure we can still round-trip data on the original connection.
+      connection.sink.add('bar');
+      expect(await connection.stream.first, 'bar');
+    });
 
-    // Should complete since the connection is closed.
-    await connection.stream.toList();
-    expect(handler.numberOfClients, 0);
-  });
+    test('Messages sent during disconnect arrive in-order', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      var connection = await handler.connections.next;
+      expect(handler.numberOfClients, 1);
 
-  test('Cancelling the listener closes the connection', () async {
-    expect(handler.numberOfClients, 0);
-    await webdriver.get('http://localhost:${server.port}');
-    var connection = await handler.connections.next;
-    expect(handler.numberOfClients, 1);
+      // Close the underlying connection.
+      closeSink(connection);
+      connection.sink.add('one');
+      connection.sink.add('two');
+      await pumpEventQueue();
 
-    var sub = connection.stream.listen((_) {});
-    await sub.cancel();
-    await pumpEventQueue();
-    expect(handler.numberOfClients, 0);
-  });
+      // Ensure there's still a connection.
+      expect(handler.numberOfClients, 1);
 
-  test('Disconnects when navigating away', () async {
-    await webdriver.get('http://localhost:${server.port}');
-    expect(handler.numberOfClients, 1);
-
-    await webdriver.get('chrome://version/');
-    expect(handler.numberOfClients, 0);
-  });
+      // Ensure messages arrive in the same order
+      expect(await connection.stream.take(2).toList(), equals(['one', 'two']));
+    });
+  }, timeout: const Timeout(Duration(seconds: 120)));
 }
 
 FutureOr<shelf.Response> _faviconHandler(shelf.Request request) {