remove connections faster
diff --git a/pkgs/sse/lib/server/sse_handler.dart b/pkgs/sse/lib/server/sse_handler.dart index 9c1f57f..0c06a14 100644 --- a/pkgs/sse/lib/server/sse_handler.dart +++ b/pkgs/sse/lib/server/sse_handler.dart
@@ -7,6 +7,7 @@ import 'package:async/async.dart'; import 'package:logging/logging.dart'; +import 'package:pedantic/pedantic.dart'; import 'package:shelf/shelf.dart' as shelf; import 'package:stream_channel/stream_channel.dart'; @@ -28,11 +29,11 @@ final Sink _sink; final String _clientId; - var _isClosed = false; + final _closedCompleter = Completer<void>(); SseConnection(this._sink, this._clientId) { _outgoingController.stream.listen((data) { - if (!_isClosed) { + if (!_closedCompleter.isCompleted) { // JSON encode the message to escape new lines. _sink.add('data: ${json.encode(data)}\n'); _sink.add('\n'); @@ -54,8 +55,8 @@ Stream<String> get stream => _incomingController.stream; void _close() { - if (!_isClosed) { - _isClosed = true; + if (!_closedCompleter.isCompleted) { + _closedCompleter.complete(); _sink.close(); if (!_outgoingController.isClosed) _outgoingController.close(); if (!_incomingController.isClosed) _incomingController.close(); @@ -103,14 +104,15 @@ var clientId = req.url.queryParameters['sseClientId']; var connection = SseConnection(sink, clientId); _connections[clientId] = connection; + unawaited(connection._closedCompleter.future.then((_) { + _connections.remove(clientId); + })); // Remove connection when it is remotely closed or the stream is // cancelled. channel.stream.listen((_) { // SSE is unidirectional. Responses are handled through POST requests. }, onDone: () { - // Trigger closing the connection. - connection.sink.close(); - _connections.remove(clientId); + connection._close(); }); _connectionController.add(connection);
diff --git a/pkgs/sse/pubspec.yaml b/pkgs/sse/pubspec.yaml index e2a5c98..054332b 100644 --- a/pkgs/sse/pubspec.yaml +++ b/pkgs/sse/pubspec.yaml
@@ -14,6 +14,7 @@ async: ^2.0.8 http: ^0.12.0+1 logging: ^0.11.3+2 + pedantic: ^1.4.0 stream_channel: ^1.6.8 shelf: ^0.7.4 uuid: ^1.0.3