Merge pull request #4 from dart-lang/close-cleanup
Close cleanup
diff --git a/.travis.yml b/.travis.yml
index 21c6fe6..3b66d0d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -4,6 +4,7 @@
sudo: required
dist: trusty
addons:
+ chrome: stable
apt:
sources:
- google-chrome
@@ -29,7 +30,7 @@
- sh -e /etc/init.d/xvfb start
before_script:
- - wget http://chromedriver.storage.googleapis.com/2.35/chromedriver_linux64.zip
+ - wget http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip
- unzip chromedriver_linux64.zip
- export PATH=$PATH:$PWD
- ./tool/travis-setup.sh
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 77c5538..ba13a26 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 2.0.0
+
+- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
+ handled by the underlying `stream` / `sink`.
+- Fix a bug where resources of the `SseConnection` were not properly closed.
+
## 1.0.0
- Internal cleanup.
diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart
index a683a2f..a3067f6 100644
--- a/lib/server/sse_handler.dart
+++ b/lib/server/sse_handler.dart
@@ -21,24 +21,28 @@
/// A bi-directional SSE connection between server and browser.
class SseConnection extends StreamChannelMixin<String> {
+ /// Incoming messages from the Browser client.
final _incomingController = StreamController<String>();
- final _outgoingController = StreamController<String>();
- final _closeCompleter = Completer<Null>();
- final Sink _sink;
- final String _clientId;
- SseConnection(this._sink, this._clientId) {
+ /// Outgoing messages to the Browser client.
+ final _outgoingController = StreamController<String>();
+
+ final Sink _sink;
+
+ final _closedCompleter = Completer<void>();
+
+ SseConnection(this._sink) {
_outgoingController.stream.listen((data) {
- if (!_closeCompleter.isCompleted) {
+ if (!_closedCompleter.isCompleted) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
}
});
+ _outgoingController.onCancel = _close;
+ _incomingController.onCancel = _close;
}
- Future get onClose => _closeCompleter.future;
-
/// The message added to the sink has to be JSON encodable.
@override
StreamSink<String> get sink => _outgoingController.sink;
@@ -50,8 +54,13 @@
@override
Stream<String> get stream => _incomingController.stream;
- void close() {
- if (!_closeCompleter.isCompleted) _closeCompleter.complete();
+ void _close() {
+ if (!_closedCompleter.isCompleted) {
+ _closedCompleter.complete();
+ _sink.close();
+ if (!_outgoingController.isClosed) _outgoingController.close();
+ if (!_incomingController.isClosed) _incomingController.close();
+ }
}
}
@@ -63,15 +72,15 @@
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;
-
- final Set<SseConnection> _connections = Set<SseConnection>();
-
+ final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();
+ StreamQueue<SseConnection> _connectionsStream;
+
SseHandler(this._uri);
StreamQueue<SseConnection> get connections =>
- StreamQueue(_connectionController.stream);
+ _connectionsStream ??= StreamQueue(_connectionController.stream);
shelf.Handler get handler => _handle;
@@ -82,19 +91,22 @@
var sink = utf8.encoder.startChunkedConversion(channel.sink);
sink.add(_sseHeaders(req.headers['origin']));
var clientId = req.url.queryParameters['sseClientId'];
- var connection = SseConnection(sink, clientId);
- _connections.add(connection);
- unawaited(connection.onClose.then((_) {
- _connections.remove(connection);
+ var connection = SseConnection(sink);
+ _connections[clientId] = connection;
+ unawaited(connection._closedCompleter.future.then((_) {
+ _connections.remove(clientId);
}));
+ // Remove connection when it is remotely closed or the stream is
+ // cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
- connection.close();
+ connection._close();
});
+
_connectionController.add(connection);
});
- return null;
+ return shelf.Response.notFound('');
}
String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
@@ -122,11 +134,7 @@
var clientId = req.url.queryParameters['sseClientId'];
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
- for (var connection in _connections) {
- if (connection._clientId == clientId) {
- connection._incomingController.add(jsonObject);
- }
- }
+ _connections[clientId]?._incomingController?.add(jsonObject);
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
}
diff --git a/pubspec.yaml b/pubspec.yaml
index a86543f..054332b 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: sse
-version: 1.0.0
+version: 2.0.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/sse
description: >-
diff --git a/test/sse_test.dart b/test/sse_test.dart
index d6e0445..16a9a0c 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -17,6 +17,21 @@
HttpServer server;
WebDriver webdriver;
SseHandler handler;
+ Process chromeDriver;
+
+ setUpAll(() async {
+ try {
+ chromeDriver = await Process.start(
+ 'chromedriver', ['--port=4444', '--url-base=wd/hub']);
+ } catch (e) {
+ throw StateError(
+ 'Could not start ChromeDriver. Is it installed?\nError: $e');
+ }
+ });
+
+ tearDownAll(() {
+ chromeDriver.kill();
+ });
setUp(() async {
handler = SseHandler(Uri.parse('/test'));
@@ -28,7 +43,11 @@
listDirectories: true, defaultDocument: 'index.html'));
server = await io.serve(cascade.handler, 'localhost', 0);
- webdriver = await createDriver();
+ webdriver = await createDriver(desired: {
+ 'chromeOptions': {
+ 'args': ['--headless']
+ }
+ });
});
tearDown(() async {
@@ -55,12 +74,12 @@
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
var connectionA = await connections.next;
+ connectionA.sink.add('foo');
+ expect(await connectionA.stream.first, 'foo');
+
await webdriver.get('http://localhost:${server.port}');
var connectionB = await connections.next;
-
- connectionA.sink.add('foo');
connectionB.sink.add('bar');
- await connectionA.onClose;
expect(await connectionB.stream.first, 'bar');
});
@@ -69,8 +88,8 @@
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
- connection.close();
- await connection.onClose;
+ await connection.sink.close();
+ await pumpEventQueue();
expect(handler.numberOfClients, 0);
});
@@ -83,7 +102,20 @@
var closeButton = await webdriver.findElement(const By.tagName('button'));
await closeButton.click();
- await connection.onClose;
+ // Should complete since the connection is closed.
+ await connection.stream.toList();
+ expect(handler.numberOfClients, 0);
+ });
+
+ test('Cancelling the listener closes the connection', () async {
+ expect(handler.numberOfClients, 0);
+ await webdriver.get('http://localhost:${server.port}');
+ var connection = await handler.connections.next;
+ expect(handler.numberOfClients, 1);
+
+ var sub = connection.stream.listen((_) {});
+ await sub.cancel();
+ await pumpEventQueue();
expect(handler.numberOfClients, 0);
});
diff --git a/tool/travis.sh b/tool/travis.sh
index b9c9c9f..9ebd3d4 100755
--- a/tool/travis.sh
+++ b/tool/travis.sh
@@ -23,10 +23,6 @@
STATUS=$ANALYSIS_STATUS
fi
-# Start chromedriver.
-chromedriver --port=4444 --url-base=wd/hub &
-PIDC=$!
-
# Run tests.
pub run test -r expanded -p vm -j 1
TEST_STATUS=$?