Better close logic
diff --git a/pkgs/sse/CHANGELOG.md b/pkgs/sse/CHANGELOG.md index 77c5538..ba13a26 100644 --- a/pkgs/sse/CHANGELOG.md +++ b/pkgs/sse/CHANGELOG.md
@@ -1,3 +1,9 @@ +## 2.0.0 + +- No longer expose `close` and `onClose` on an `SseConnection`. This is simply + handled by the underlying `stream` / `sink`. +- Fix a bug where resources of the `SseConnection` were not properly closed. + ## 1.0.0 - Internal cleanup.
diff --git a/pkgs/sse/lib/server/sse_handler.dart b/pkgs/sse/lib/server/sse_handler.dart index a683a2f..abec5cf 100644 --- a/pkgs/sse/lib/server/sse_handler.dart +++ b/pkgs/sse/lib/server/sse_handler.dart
@@ -7,7 +7,6 @@ import 'package:async/async.dart'; import 'package:logging/logging.dart'; -import 'package:pedantic/pedantic.dart'; import 'package:shelf/shelf.dart' as shelf; import 'package:stream_channel/stream_channel.dart'; @@ -21,24 +20,28 @@ /// A bi-directional SSE connection between server and browser. class SseConnection extends StreamChannelMixin<String> { + // Incoming messages from the Browser client. final _incomingController = StreamController<String>(); + // Outgoing messages to the Browser client. final _outgoingController = StreamController<String>(); - final _closeCompleter = Completer<Null>(); + final Sink _sink; final String _clientId; + var _isClosed = false; + SseConnection(this._sink, this._clientId) { _outgoingController.stream.listen((data) { - if (!_closeCompleter.isCompleted) { + if (!_isClosed) { // JSON encode the message to escape new lines. _sink.add('data: ${json.encode(data)}\n'); _sink.add('\n'); } }); + _outgoingController.onCancel = _close; + _incomingController.onCancel = _close; } - Future get onClose => _closeCompleter.future; - /// The message added to the sink has to be JSON encodable. @override StreamSink<String> get sink => _outgoingController.sink; @@ -50,8 +53,13 @@ @override Stream<String> get stream => _incomingController.stream; - void close() { - if (!_closeCompleter.isCompleted) _closeCompleter.complete(); + void _close() { + if (!_isClosed) { + _isClosed = true; + _sink.close(); + if (!_outgoingController.isClosed) _outgoingController.close(); + if (!_incomingController.isClosed) _incomingController.close(); + } } } @@ -64,19 +72,28 @@ final _logger = Logger('SseHandler'); final Uri _uri; - final Set<SseConnection> _connections = Set<SseConnection>(); + final _connections = <SseConnection>{}; final _connectionController = StreamController<SseConnection>(); - SseHandler(this._uri); + StreamQueue<SseConnection> _connectionsStream; + SseHandler(this._uri); StreamQueue<SseConnection> get connections => - StreamQueue(_connectionController.stream); + _connectionsStream ??= StreamQueue(_connectionController.stream); shelf.Handler get handler => _handle; int get numberOfClients => _connections.length; + void close() { + if (!_connectionController.isClosed) _connectionController.close(); + for (var connection in _connections) { + connection.sink.close(); + _connections.remove(connection); + } + } + shelf.Response _createSseConnection(shelf.Request req, String path) { req.hijack((channel) async { var sink = utf8.encoder.startChunkedConversion(channel.sink); @@ -84,14 +101,12 @@ var clientId = req.url.queryParameters['sseClientId']; var connection = SseConnection(sink, clientId); _connections.add(connection); - unawaited(connection.onClose.then((_) { + // Remove connection when it is remotely closed. + channel.stream.listen((_) {}, onDone: () { + connection.sink.close(); _connections.remove(connection); - })); - channel.stream.listen((_) { - // SSE is unidirectional. Responses are handled through POST requests. - }, onDone: () { - connection.close(); }); + _connectionController.add(connection); }); return null;
diff --git a/pkgs/sse/pubspec.yaml b/pkgs/sse/pubspec.yaml index a86543f..e9dfe3d 100644 --- a/pkgs/sse/pubspec.yaml +++ b/pkgs/sse/pubspec.yaml
@@ -14,7 +14,6 @@ async: ^2.0.8 http: ^0.12.0+1 logging: ^0.11.3+2 - pedantic: ^1.4.0 stream_channel: ^1.6.8 shelf: ^0.7.4 uuid: ^1.0.3
diff --git a/pkgs/sse/test/sse_test.dart b/pkgs/sse/test/sse_test.dart index d6e0445..a9db7d9 100644 --- a/pkgs/sse/test/sse_test.dart +++ b/pkgs/sse/test/sse_test.dart
@@ -55,12 +55,12 @@ var connections = handler.connections; await webdriver.get('http://localhost:${server.port}'); var connectionA = await connections.next; + connectionA.sink.add('foo'); + expect(await connectionA.stream.first, 'foo'); + await webdriver.get('http://localhost:${server.port}'); var connectionB = await connections.next; - - connectionA.sink.add('foo'); connectionB.sink.add('bar'); - await connectionA.onClose; expect(await connectionB.stream.first, 'bar'); }); @@ -69,8 +69,8 @@ await webdriver.get('http://localhost:${server.port}'); var connection = await handler.connections.next; expect(handler.numberOfClients, 1); - connection.close(); - await connection.onClose; + await connection.sink.close(); + await pumpEventQueue(); expect(handler.numberOfClients, 0); }); @@ -83,7 +83,8 @@ var closeButton = await webdriver.findElement(const By.tagName('button')); await closeButton.click(); - await connection.onClose; + // Stream should complete. + await connection.stream.toList(); expect(handler.numberOfClients, 0); });