use a map
diff --git a/pkgs/sse/lib/server/sse_handler.dart b/pkgs/sse/lib/server/sse_handler.dart
index 07e9a78..7a5ac0e 100644
--- a/pkgs/sse/lib/server/sse_handler.dart
+++ b/pkgs/sse/lib/server/sse_handler.dart
@@ -71,14 +71,14 @@
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;
-
- final _connections = <SseConnection>{};
-
+ final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();
+ var _isClosed = false;
StreamQueue<SseConnection> _connectionsStream;
SseHandler(this._uri);
+
StreamQueue<SseConnection> get connections =>
_connectionsStream ??= StreamQueue(_connectionController.stream);
@@ -87,20 +87,22 @@
int get numberOfClients => _connections.length;
void close() {
+ _isClosed = true;
if (!_connectionController.isClosed) _connectionController.close();
- for (var connection in _connections.toList()) {
+ for (var connection in _connections.values.toList()) {
connection.sink.close();
- _connections.remove(connection);
+ _connections.remove(connection._clientId);
}
}
shelf.Response _createSseConnection(shelf.Request req, String path) {
+ if (_isClosed) return null;
req.hijack((channel) async {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
sink.add(_sseHeaders(req.headers['origin']));
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink, clientId);
- _connections.add(connection);
+ _connections[clientId] = connection;
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
@@ -108,7 +110,7 @@
}, onDone: () {
// Trigger closing the connection.
connection.sink.close();
- _connections.remove(connection);
+ _connections.remove(clientId);
});
_connectionController.add(connection);
@@ -141,11 +143,8 @@
var clientId = req.url.queryParameters['sseClientId'];
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
- for (var connection in _connections) {
- if (connection._clientId == clientId) {
- connection._incomingController.add(jsonObject);
- }
- }
+ var connection = _connections[clientId];
+ if (connection != null) connection._incomingController.add(jsonObject);
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
}