Support transparent reconnects on the server (dart-lang/sse#19)
@grouma this is an attempt to fix dart-lang/sse#18 (may be easier to [view the diff ignoring whitespace](https://github.com/dart-lang/sse/pull/19/files?utf8=%E2%9C%93&diff=unified&w=1) since some code got indenting and makes the diff look much bigger than it is).
However there is an exposed method here - `closeSink` that closes the underlying sink (in order to test - we can't close the exposed `sink` because it closes the stream controller that needs to continue to be used). I'm not sure if there's a better way (we can add `@visibleForTesting`, though `meta` isn't currently referenced here).
Happy to make changes if this isn't what you had in mind (and I can test it end-to-end through dwds and GitPod to confirm it works prior to merging it).
diff --git a/pkgs/sse/.travis.yml b/pkgs/sse/.travis.yml
index e0371f6..d6bce8f 100644
--- a/pkgs/sse/.travis.yml
+++ b/pkgs/sse/.travis.yml
@@ -5,7 +5,7 @@
dart:
- dev
- - 2.1.0
+ - 2.2.0
with_content_shell: false
diff --git a/pkgs/sse/CHANGELOG.md b/pkgs/sse/CHANGELOG.md
index f46643e..022f120 100644
--- a/pkgs/sse/CHANGELOG.md
+++ b/pkgs/sse/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 3.1.0
+
+- Add optional `keepAlive` parameter to the `SseHandler`. If `keepAlive` is
+ supplied, the connection will remain active for this period after a
+ disconnect and can be reconnected transparently. If there is no reconnect
+ within that period, the connection will be closed normally.
+
## 3.0.0
- Add retry logic.
diff --git a/pkgs/sse/lib/server/sse_handler.dart b/pkgs/sse/lib/server/sse_handler.dart
index 1a5ef06..bfed935 100644
--- a/pkgs/sse/lib/server/sse_handler.dart
+++ b/pkgs/sse/lib/server/sse_handler.dart
@@ -2,151 +2,4 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-import 'dart:async';
-import 'dart:convert';
-
-import 'package:async/async.dart';
-import 'package:logging/logging.dart';
-import 'package:pedantic/pedantic.dart';
-import 'package:shelf/shelf.dart' as shelf;
-import 'package:stream_channel/stream_channel.dart';
-
-// RFC 2616 requires carriage return delimiters.
-String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
- 'Content-Type: text/event-stream\r\n'
- 'Cache-Control: no-cache\r\n'
- 'Connection: keep-alive\r\n'
- 'Access-Control-Allow-Credentials: true\r\n'
- 'Access-Control-Allow-Origin: $origin\r\n'
- '\r\n\r\n';
-
-/// A bi-directional SSE connection between server and browser.
-class SseConnection extends StreamChannelMixin<String> {
- /// Incoming messages from the Browser client.
- final _incomingController = StreamController<String>();
-
- /// Outgoing messages to the Browser client.
- final _outgoingController = StreamController<String>();
-
- final Sink _sink;
-
- final _closedCompleter = Completer<void>();
-
- SseConnection(this._sink) {
- _outgoingController.stream.listen((data) {
- if (!_closedCompleter.isCompleted) {
- // JSON encode the message to escape new lines.
- _sink.add('data: ${json.encode(data)}\n');
- _sink.add('\n');
- }
- });
- _outgoingController.onCancel = _close;
- _incomingController.onCancel = _close;
- }
-
- /// The message added to the sink has to be JSON encodable.
- @override
- StreamSink<String> get sink => _outgoingController.sink;
-
- // Add messages to this [StreamSink] to send them to the server.
- /// [Stream] of messages sent from the server to this client.
- ///
- /// A message is a decoded JSON object.
- @override
- Stream<String> get stream => _incomingController.stream;
-
- void _close() {
- if (!_closedCompleter.isCompleted) {
- _closedCompleter.complete();
- _sink.close();
- if (!_outgoingController.isClosed) _outgoingController.close();
- if (!_incomingController.isClosed) _incomingController.close();
- }
- }
-}
-
-/// [SseHandler] handles requests on a user defined path to create
-/// two-way communications of JSON encodable data between server and clients.
-///
-/// A server sends messages to a client through an SSE channel, while
-/// a client sends message to a server through HTTP POST requests.
-class SseHandler {
- final _logger = Logger('SseHandler');
- final Uri _uri;
- final _connections = <String, SseConnection>{};
- final _connectionController = StreamController<SseConnection>();
-
- StreamQueue<SseConnection> _connectionsStream;
-
- SseHandler(this._uri);
-
- StreamQueue<SseConnection> get connections =>
- _connectionsStream ??= StreamQueue(_connectionController.stream);
-
- shelf.Handler get handler => _handle;
-
- int get numberOfClients => _connections.length;
-
- shelf.Response _createSseConnection(shelf.Request req, String path) {
- req.hijack((channel) async {
- var sink = utf8.encoder.startChunkedConversion(channel.sink);
- sink.add(_sseHeaders(req.headers['origin']));
- var clientId = req.url.queryParameters['sseClientId'];
- var connection = SseConnection(sink);
- _connections[clientId] = connection;
- unawaited(connection._closedCompleter.future.then((_) {
- _connections.remove(clientId);
- }));
- // Remove connection when it is remotely closed or the stream is
- // cancelled.
- channel.stream.listen((_) {
- // SSE is unidirectional. Responses are handled through POST requests.
- }, onDone: () {
- connection._close();
- });
-
- _connectionController.add(connection);
- });
- return shelf.Response.notFound('');
- }
-
- String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
-
- Future<shelf.Response> _handle(shelf.Request req) async {
- var path = _getOriginalPath(req);
- if (_uri.path != path) {
- return shelf.Response.notFound('');
- }
-
- if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
- return _createSseConnection(req, path);
- }
-
- if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
- return _handleIncomingMessage(req, path);
- }
-
- return shelf.Response.notFound('');
- }
-
- Future<shelf.Response> _handleIncomingMessage(
- shelf.Request req, String path) async {
- try {
- var clientId = req.url.queryParameters['sseClientId'];
- var message = await req.readAsString();
- var jsonObject = json.decode(message) as String;
- _connections[clientId]?._incomingController?.add(jsonObject);
- } catch (e, st) {
- _logger.fine('Failed to handle incoming message. $e $st');
- }
- return shelf.Response.ok('', headers: {
- 'access-control-allow-credentials': 'true',
- 'access-control-allow-origin': _originFor(req),
- });
- }
-
- String _originFor(shelf.Request req) =>
- // Firefox does not set header "origin".
- // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
- req.headers['origin'] ?? req.headers['host'];
-}
+export 'package:sse/src/server/sse_handler.dart' show SseConnection, SseHandler;
diff --git a/pkgs/sse/lib/src/server/sse_handler.dart b/pkgs/sse/lib/src/server/sse_handler.dart
new file mode 100644
index 0000000..e8cd523
--- /dev/null
+++ b/pkgs/sse/lib/src/server/sse_handler.dart
@@ -0,0 +1,221 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+import 'package:logging/logging.dart';
+import 'package:pedantic/pedantic.dart';
+import 'package:shelf/shelf.dart' as shelf;
+import 'package:stream_channel/stream_channel.dart';
+
+// RFC 2616 requires carriage return delimiters.
+String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/event-stream\r\n'
+ 'Cache-Control: no-cache\r\n'
+ 'Connection: keep-alive\r\n'
+ 'Access-Control-Allow-Credentials: true\r\n'
+ 'Access-Control-Allow-Origin: $origin\r\n'
+ '\r\n\r\n';
+
+/// A bi-directional SSE connection between server and browser.
+class SseConnection extends StreamChannelMixin<String> {
+ /// Incoming messages from the Browser client.
+ final _incomingController = StreamController<String>();
+
+ /// Outgoing messages to the Browser client.
+ final _outgoingController = StreamController<String>();
+
+ Sink _sink;
+
+ /// How long to wait after a connection drops before considering it closed.
+ final Duration _keepAlive;
+
+ /// A timer counting down the KeepAlive period (null if hasn't disconnected).
+ Timer _keepAliveTimer;
+
+ /// Whether this connection is currently in the KeepAlive timeout period.
+ bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false;
+
+ final _closedCompleter = Completer<void>();
+
+ /// Creates an [SseConnection] for the supplied [_sink].
+ ///
+ /// If [keepAlive] is supplied, the connection will remain active for this
+ /// period after a disconnect and can be reconnected transparently. If there
+ /// is no reconnect within that period, the connection will be closed normally.
+ ///
+ /// If [keepAlive] is not supplied, the connection will be closed immediately
+ /// after a disconnect.
+ SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
+ unawaited(_setUpListener());
+ _outgoingController.onCancel = _close;
+ _incomingController.onCancel = _close;
+ }
+
+ Future<void> _setUpListener() async {
+ var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
+ while (await outgoingStreamQueue.hasNext) {
+ // If we're in a KeepAlive timeout, there's nowhere to send messages so
+ // wait a short period and check again.
+ if (isInKeepAlivePeriod) {
+ await Future.delayed(const Duration(milliseconds: 200));
+ continue;
+ }
+
+ // Peek the data so we don't remove it from the stream if we're unable to
+ // send it.
+ final data = await outgoingStreamQueue.peek;
+ try {
+ // JSON encode the message to escape new lines.
+ _sink.add('data: ${json.encode(data)}\n');
+ _sink.add('\n');
+ await outgoingStreamQueue.next; // Consume from stream if no errors.
+ } catch (StateError) {
+ if (_keepAlive == null || _closedCompleter.isCompleted) {
+ rethrow;
+ }
+ // If we got here then the sink may have closed but the stream.onDone
+ // hasn't fired yet, so pause the subscription and skip calling
+ // `next` so the message remains in the queue to try again.
+ _handleDisconnect();
+ }
+ }
+ }
+
+ /// The message added to the sink has to be JSON encodable.
+ @override
+ StreamSink<String> get sink => _outgoingController.sink;
+
+ // Add messages to this [StreamSink] to send them to the server.
+ /// [Stream] of messages sent from the server to this client.
+ ///
+ /// A message is a decoded JSON object.
+ @override
+ Stream<String> get stream => _incomingController.stream;
+
+ void _acceptReconnection(Sink sink) {
+ _keepAliveTimer?.cancel();
+ _sink = sink;
+ }
+
+ void _handleDisconnect() {
+ if (_keepAlive == null) {
+ // Close immediately if we're not keeping alive.
+ _close();
+ } else if (!isInKeepAlivePeriod) {
+ // Otherwise if we didn't already have an active timer, set a timer to
+ // close after the timeout period. If the connection comes back, this will
+ // be cancelled and all messages left in the queue tried again.
+ _keepAliveTimer = Timer(_keepAlive, _close);
+ }
+ }
+
+ void _close() {
+ if (!_closedCompleter.isCompleted) {
+ _closedCompleter.complete();
+ _sink.close();
+ if (!_outgoingController.isClosed) _outgoingController.close();
+ if (!_incomingController.isClosed) _incomingController.close();
+ }
+ }
+}
+
+/// [SseHandler] handles requests on a user defined path to create
+/// two-way communications of JSON encodable data between server and clients.
+///
+/// A server sends messages to a client through an SSE channel, while
+/// a client sends message to a server through HTTP POST requests.
+class SseHandler {
+ final _logger = Logger('SseHandler');
+ final Uri _uri;
+ final Duration _keepAlive;
+ final _connections = <String, SseConnection>{};
+ final _connectionController = StreamController<SseConnection>();
+
+ StreamQueue<SseConnection> _connectionsStream;
+
+ SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive;
+
+ StreamQueue<SseConnection> get connections =>
+ _connectionsStream ??= StreamQueue(_connectionController.stream);
+
+ shelf.Handler get handler => _handle;
+
+ int get numberOfClients => _connections.length;
+
+ shelf.Response _createSseConnection(shelf.Request req, String path) {
+ req.hijack((channel) async {
+ var sink = utf8.encoder.startChunkedConversion(channel.sink);
+ sink.add(_sseHeaders(req.headers['origin']));
+ var clientId = req.url.queryParameters['sseClientId'];
+
+ // Check if we already have a connection for this ID that is in the process
+ // of timing out (in which case we can reconnect it transparently).
+ if (_connections[clientId] != null &&
+ _connections[clientId].isInKeepAlivePeriod) {
+ _connections[clientId]._acceptReconnection(sink);
+ } else {
+ var connection = SseConnection(sink, keepAlive: _keepAlive);
+ _connections[clientId] = connection;
+ unawaited(connection._closedCompleter.future.then((_) {
+ _connections.remove(clientId);
+ }));
+ // Remove connection when it is remotely closed or the stream is
+ // cancelled.
+ channel.stream.listen((_) {
+ // SSE is unidirectional. Responses are handled through POST requests.
+ }, onDone: () {
+ connection._handleDisconnect();
+ });
+
+ _connectionController.add(connection);
+ }
+ });
+ return shelf.Response.notFound('');
+ }
+
+ String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
+
+ Future<shelf.Response> _handle(shelf.Request req) async {
+ var path = _getOriginalPath(req);
+ if (_uri.path != path) {
+ return shelf.Response.notFound('');
+ }
+
+ if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
+ return _createSseConnection(req, path);
+ }
+
+ if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
+ return _handleIncomingMessage(req, path);
+ }
+
+ return shelf.Response.notFound('');
+ }
+
+ Future<shelf.Response> _handleIncomingMessage(
+ shelf.Request req, String path) async {
+ try {
+ var clientId = req.url.queryParameters['sseClientId'];
+ var message = await req.readAsString();
+ var jsonObject = json.decode(message) as String;
+ _connections[clientId]?._incomingController?.add(jsonObject);
+ } catch (e, st) {
+ _logger.fine('Failed to handle incoming message. $e $st');
+ }
+ return shelf.Response.ok('', headers: {
+ 'access-control-allow-credentials': 'true',
+ 'access-control-allow-origin': _originFor(req),
+ });
+ }
+
+ String _originFor(shelf.Request req) =>
+ // Firefox does not set header "origin".
+ // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
+ req.headers['origin'] ?? req.headers['host'];
+}
+
+void closeSink(SseConnection connection) => connection._sink.close();
diff --git a/pkgs/sse/pubspec.yaml b/pkgs/sse/pubspec.yaml
index 33fc5fb..9c164f7 100644
--- a/pkgs/sse/pubspec.yaml
+++ b/pkgs/sse/pubspec.yaml
@@ -1,5 +1,5 @@
name: sse
-version: 3.0.0
+version: 3.1.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/sse
description: >-
@@ -8,7 +8,7 @@
requests.
environment:
- sdk: ">=2.1.0 <3.0.0"
+ sdk: ">=2.2.0 <3.0.0"
dependencies:
async: ^2.0.8
diff --git a/pkgs/sse/test/sse_test.dart b/pkgs/sse/test/sse_test.dart
index 62fe4c6..0d4040f 100644
--- a/pkgs/sse/test/sse_test.dart
+++ b/pkgs/sse/test/sse_test.dart
@@ -10,6 +10,7 @@
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_static/shelf_static.dart';
import 'package:sse/server/sse_handler.dart';
+import 'package:sse/src/server/sse_handler.dart' show closeSink;
import 'package:test/test.dart';
import 'package:webdriver/io.dart';
@@ -33,99 +34,180 @@
chromeDriver.kill();
});
- setUp(() async {
- handler = SseHandler(Uri.parse('/test'));
+ group('SSE', () {
+ setUp(() async {
+ handler = SseHandler(Uri.parse('/test'));
- var cascade = shelf.Cascade().add(handler.handler).add(_faviconHandler).add(
- createStaticHandler('test/web',
- listDirectories: true, defaultDocument: 'index.html'));
+ var cascade = shelf.Cascade()
+ .add(handler.handler)
+ .add(_faviconHandler)
+ .add(createStaticHandler('test/web',
+ listDirectories: true, defaultDocument: 'index.html'));
- server = await io.serve(cascade.handler, 'localhost', 0);
- var capabilities = Capabilities.chrome
- ..addAll({
- Capabilities.chromeOptions: {
- 'args': ['--headless']
- }
- });
- webdriver = await createDriver(desired: capabilities);
+ server = await io.serve(cascade.handler, 'localhost', 0);
+ var capabilities = Capabilities.chrome
+ ..addAll({
+ Capabilities.chromeOptions: {
+ 'args': ['--headless']
+ }
+ });
+ webdriver = await createDriver(desired: capabilities);
+ });
+
+ tearDown(() async {
+ await webdriver.quit();
+ await server.close();
+ });
+
+ test('Can round trip messages', () async {
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ connection.sink.add('blah');
+ expect(await connection.stream.first, 'blah');
+ });
+
+ test('Multiple clients can connect', () async {
+ var connections = handler.connections;
+ await webdriver.get('http://localhost:${server.port}');
+ await connections.next;
+ await webdriver.get('http://localhost:${server.port}');
+ await connections.next;
+ });
+
+ test('Routes data correctly', () async {
+ var connections = handler.connections;
+ await webdriver.get('http://localhost:${server.port}');
+ var connectionA = await connections.next;
+ connectionA.sink.add('foo');
+ expect(await connectionA.stream.first, 'foo');
+
+ await webdriver.get('http://localhost:${server.port}');
+ var connectionB = await connections.next;
+ connectionB.sink.add('bar');
+ expect(await connectionB.stream.first, 'bar');
+ });
+
+ test('Can close from the server', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
+ await connection.sink.close();
+ await pumpEventQueue();
+ expect(handler.numberOfClients, 0);
+ });
+
+ test('Client reconnects after being disconnected', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
+ await connection.sink.close();
+ await pumpEventQueue();
+ expect(handler.numberOfClients, 0);
+
+ // Ensure the client reconnects
+ await handler.connections.next;
+ });
+
+ test('Can close from the client-side', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
+
+ var closeButton = await webdriver.findElement(const By.tagName('button'));
+ await closeButton.click();
+
+ // Should complete since the connection is closed.
+ await connection.stream.toList();
+ expect(handler.numberOfClients, 0);
+ });
+
+ test('Cancelling the listener closes the connection', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
+
+ var sub = connection.stream.listen((_) {});
+ await sub.cancel();
+ await pumpEventQueue();
+ expect(handler.numberOfClients, 0);
+ });
+
+ test('Disconnects when navigating away', () async {
+ await webdriver.get('http://localhost:${server.port}');
+ expect(handler.numberOfClients, 1);
+
+ await webdriver.get('chrome://version/');
+ expect(handler.numberOfClients, 0);
+ });
});
- tearDown(() async {
- await webdriver.quit();
- await server.close();
- });
+ group('SSE with server keep-alive', () {
+ setUp(() async {
+ handler =
+ SseHandler(Uri.parse('/test'), keepAlive: const Duration(seconds: 5));
- test('Can round trip messages', () async {
- await webdriver.get('http://localhost:${server.port}');
- var connection = await handler.connections.next;
- connection.sink.add('blah');
- expect(await connection.stream.first, 'blah');
- });
+ var cascade = shelf.Cascade()
+ .add(handler.handler)
+ .add(_faviconHandler)
+ .add(createStaticHandler('test/web',
+ listDirectories: true, defaultDocument: 'index.html'));
- test('Multiple clients can connect', () async {
- var connections = handler.connections;
- await webdriver.get('http://localhost:${server.port}');
- await connections.next;
- await webdriver.get('http://localhost:${server.port}');
- await connections.next;
- });
+ server = await io.serve(cascade.handler, 'localhost', 0);
+ var capabilities = Capabilities.chrome
+ ..addAll({
+ Capabilities.chromeOptions: {
+ 'args': ['--headless']
+ }
+ });
+ webdriver = await createDriver(desired: capabilities);
+ });
- test('Routes data correctly', () async {
- var connections = handler.connections;
- await webdriver.get('http://localhost:${server.port}');
- var connectionA = await connections.next;
- connectionA.sink.add('foo');
- expect(await connectionA.stream.first, 'foo');
+ tearDown(() async {
+ await webdriver.quit();
+ await server.close();
+ });
- await webdriver.get('http://localhost:${server.port}');
- var connectionB = await connections.next;
- connectionB.sink.add('bar');
- expect(await connectionB.stream.first, 'bar');
- });
+ test('Client reconnect use the same connection', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
- test('Can close from the server', () async {
- expect(handler.numberOfClients, 0);
- await webdriver.get('http://localhost:${server.port}');
- var connection = await handler.connections.next;
- expect(handler.numberOfClients, 1);
- await connection.sink.close();
- await pumpEventQueue();
- expect(handler.numberOfClients, 0);
- });
+ // Close the underlying connection.
+ closeSink(connection);
+ await pumpEventQueue();
- test('Can close from the client-side', () async {
- expect(handler.numberOfClients, 0);
- await webdriver.get('http://localhost:${server.port}');
- var connection = await handler.connections.next;
- expect(handler.numberOfClients, 1);
+ // Ensure there's still a connection.
+ expect(handler.numberOfClients, 1);
- var closeButton = await webdriver.findElement(const By.tagName('button'));
- await closeButton.click();
+ // Ensure we can still round-trip data on the original connection.
+ connection.sink.add('bar');
+ expect(await connection.stream.first, 'bar');
+ });
- // Should complete since the connection is closed.
- await connection.stream.toList();
- expect(handler.numberOfClients, 0);
- });
+ test('Messages sent during disconnect arrive in-order', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
- test('Cancelling the listener closes the connection', () async {
- expect(handler.numberOfClients, 0);
- await webdriver.get('http://localhost:${server.port}');
- var connection = await handler.connections.next;
- expect(handler.numberOfClients, 1);
+ // Close the underlying connection.
+ closeSink(connection);
+ connection.sink.add('one');
+ connection.sink.add('two');
+ await pumpEventQueue();
- var sub = connection.stream.listen((_) {});
- await sub.cancel();
- await pumpEventQueue();
- expect(handler.numberOfClients, 0);
- });
+ // Ensure there's still a connection.
+ expect(handler.numberOfClients, 1);
- test('Disconnects when navigating away', () async {
- await webdriver.get('http://localhost:${server.port}');
- expect(handler.numberOfClients, 1);
-
- await webdriver.get('chrome://version/');
- expect(handler.numberOfClients, 0);
- });
+ // Ensure messages arrive in the same order
+ expect(await connection.stream.take(2).toList(), equals(['one', 'two']));
+ });
+ }, timeout: const Timeout(Duration(seconds: 120)));
}
FutureOr<shelf.Response> _faviconHandler(shelf.Request request) {