Better close logic
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 77c5538..ba13a26 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 2.0.0
+
+- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
+ handled by the underlying `stream` / `sink`.
+- Fix a bug where resources of the `SseConnection` were not properly closed.
+
## 1.0.0
- Internal cleanup.
diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart
index a683a2f..abec5cf 100644
--- a/lib/server/sse_handler.dart
+++ b/lib/server/sse_handler.dart
@@ -7,7 +7,6 @@
import 'package:async/async.dart';
import 'package:logging/logging.dart';
-import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart' as shelf;
import 'package:stream_channel/stream_channel.dart';
@@ -21,24 +20,28 @@
/// A bi-directional SSE connection between server and browser.
class SseConnection extends StreamChannelMixin<String> {
+ // Incoming messages from the Browser client.
final _incomingController = StreamController<String>();
+ // Outgoing messages to the Browser client.
final _outgoingController = StreamController<String>();
- final _closeCompleter = Completer<Null>();
+
final Sink _sink;
final String _clientId;
+ var _isClosed = false;
+
SseConnection(this._sink, this._clientId) {
_outgoingController.stream.listen((data) {
- if (!_closeCompleter.isCompleted) {
+ if (!_isClosed) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
}
});
+ _outgoingController.onCancel = _close;
+ _incomingController.onCancel = _close;
}
- Future get onClose => _closeCompleter.future;
-
/// The message added to the sink has to be JSON encodable.
@override
StreamSink<String> get sink => _outgoingController.sink;
@@ -50,8 +53,13 @@
@override
Stream<String> get stream => _incomingController.stream;
- void close() {
- if (!_closeCompleter.isCompleted) _closeCompleter.complete();
+ void _close() {
+ if (!_isClosed) {
+ _isClosed = true;
+ _sink.close();
+ if (!_outgoingController.isClosed) _outgoingController.close();
+ if (!_incomingController.isClosed) _incomingController.close();
+ }
}
}
@@ -64,19 +72,28 @@
final _logger = Logger('SseHandler');
final Uri _uri;
- final Set<SseConnection> _connections = Set<SseConnection>();
+ final _connections = <SseConnection>{};
final _connectionController = StreamController<SseConnection>();
- SseHandler(this._uri);
+ StreamQueue<SseConnection> _connectionsStream;
+ SseHandler(this._uri);
StreamQueue<SseConnection> get connections =>
- StreamQueue(_connectionController.stream);
+ _connectionsStream ??= StreamQueue(_connectionController.stream);
shelf.Handler get handler => _handle;
int get numberOfClients => _connections.length;
+ void close() {
+ if (!_connectionController.isClosed) _connectionController.close();
+ for (var connection in _connections) {
+ connection.sink.close();
+ _connections.remove(connection);
+ }
+ }
+
shelf.Response _createSseConnection(shelf.Request req, String path) {
req.hijack((channel) async {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
@@ -84,14 +101,12 @@
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink, clientId);
_connections.add(connection);
- unawaited(connection.onClose.then((_) {
+ // Remove connection when it is remotely closed.
+ channel.stream.listen((_) {}, onDone: () {
+ connection.sink.close();
_connections.remove(connection);
- }));
- channel.stream.listen((_) {
- // SSE is unidirectional. Responses are handled through POST requests.
- }, onDone: () {
- connection.close();
});
+
_connectionController.add(connection);
});
return null;
diff --git a/pubspec.yaml b/pubspec.yaml
index a86543f..e9dfe3d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -14,7 +14,6 @@
async: ^2.0.8
http: ^0.12.0+1
logging: ^0.11.3+2
- pedantic: ^1.4.0
stream_channel: ^1.6.8
shelf: ^0.7.4
uuid: ^1.0.3
diff --git a/test/sse_test.dart b/test/sse_test.dart
index d6e0445..a9db7d9 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -55,12 +55,12 @@
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
var connectionA = await connections.next;
+ connectionA.sink.add('foo');
+ expect(await connectionA.stream.first, 'foo');
+
await webdriver.get('http://localhost:${server.port}');
var connectionB = await connections.next;
-
- connectionA.sink.add('foo');
connectionB.sink.add('bar');
- await connectionA.onClose;
expect(await connectionB.stream.first, 'bar');
});
@@ -69,8 +69,8 @@
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
- connection.close();
- await connection.onClose;
+ await connection.sink.close();
+ await pumpEventQueue();
expect(handler.numberOfClients, 0);
});
@@ -83,7 +83,8 @@
var closeButton = await webdriver.findElement(const By.tagName('button'));
await closeButton.click();
- await connection.onClose;
+ // Stream should complete.
+ await connection.stream.toList();
expect(handler.numberOfClients, 0);
});