remove connections faster
diff --git a/pkgs/sse/lib/server/sse_handler.dart b/pkgs/sse/lib/server/sse_handler.dart
index 9c1f57f..0c06a14 100644
--- a/pkgs/sse/lib/server/sse_handler.dart
+++ b/pkgs/sse/lib/server/sse_handler.dart
@@ -7,6 +7,7 @@
import 'package:async/async.dart';
import 'package:logging/logging.dart';
+import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart' as shelf;
import 'package:stream_channel/stream_channel.dart';
@@ -28,11 +29,11 @@
final Sink _sink;
final String _clientId;
- var _isClosed = false;
+ final _closedCompleter = Completer<void>();
SseConnection(this._sink, this._clientId) {
_outgoingController.stream.listen((data) {
- if (!_isClosed) {
+ if (!_closedCompleter.isCompleted) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
@@ -54,8 +55,8 @@
Stream<String> get stream => _incomingController.stream;
void _close() {
- if (!_isClosed) {
- _isClosed = true;
+ if (!_closedCompleter.isCompleted) {
+ _closedCompleter.complete();
_sink.close();
if (!_outgoingController.isClosed) _outgoingController.close();
if (!_incomingController.isClosed) _incomingController.close();
@@ -103,14 +104,15 @@
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink, clientId);
_connections[clientId] = connection;
+ unawaited(connection._closedCompleter.future.then((_) {
+ _connections.remove(clientId);
+ }));
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
- // Trigger closing the connection.
- connection.sink.close();
- _connections.remove(clientId);
+ connection._close();
});
_connectionController.add(connection);
diff --git a/pkgs/sse/pubspec.yaml b/pkgs/sse/pubspec.yaml
index e2a5c98..054332b 100644
--- a/pkgs/sse/pubspec.yaml
+++ b/pkgs/sse/pubspec.yaml
@@ -14,6 +14,7 @@
async: ^2.0.8
http: ^0.12.0+1
logging: ^0.11.3+2
+ pedantic: ^1.4.0
stream_channel: ^1.6.8
shelf: ^0.7.4
uuid: ^1.0.3