Better close logic
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 77c5538..ba13a26 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 2.0.0
+
+- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
+  handled by the underlying `stream` / `sink`.
+- Fix a bug where resources of the `SseConnection` were not properly closed.
+
 ## 1.0.0
 
 - Internal cleanup.
diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart
index a683a2f..abec5cf 100644
--- a/lib/server/sse_handler.dart
+++ b/lib/server/sse_handler.dart
@@ -7,7 +7,6 @@
 
 import 'package:async/async.dart';
 import 'package:logging/logging.dart';
-import 'package:pedantic/pedantic.dart';
 import 'package:shelf/shelf.dart' as shelf;
 import 'package:stream_channel/stream_channel.dart';
 
@@ -21,24 +20,28 @@
 
 /// A bi-directional SSE connection between server and browser.
 class SseConnection extends StreamChannelMixin<String> {
+  // Incoming messages from the Browser client.
   final _incomingController = StreamController<String>();
+  // Outgoing messages to the Browser client.
   final _outgoingController = StreamController<String>();
-  final _closeCompleter = Completer<Null>();
+
   final Sink _sink;
   final String _clientId;
 
+  var _isClosed = false;
+
   SseConnection(this._sink, this._clientId) {
     _outgoingController.stream.listen((data) {
-      if (!_closeCompleter.isCompleted) {
+      if (!_isClosed) {
         // JSON encode the message to escape new lines.
         _sink.add('data: ${json.encode(data)}\n');
         _sink.add('\n');
       }
     });
+    _outgoingController.onCancel = _close;
+    _incomingController.onCancel = _close;
   }
 
-  Future get onClose => _closeCompleter.future;
-
   /// The message added to the sink has to be JSON encodable.
   @override
   StreamSink<String> get sink => _outgoingController.sink;
@@ -50,8 +53,13 @@
   @override
   Stream<String> get stream => _incomingController.stream;
 
-  void close() {
-    if (!_closeCompleter.isCompleted) _closeCompleter.complete();
+  void _close() {
+    if (!_isClosed) {
+      _isClosed = true;
+      _sink.close();
+      if (!_outgoingController.isClosed) _outgoingController.close();
+      if (!_incomingController.isClosed) _incomingController.close();
+    }
   }
 }
 
@@ -64,19 +72,28 @@
   final _logger = Logger('SseHandler');
   final Uri _uri;
 
-  final Set<SseConnection> _connections = Set<SseConnection>();
+  final _connections = <SseConnection>{};
 
   final _connectionController = StreamController<SseConnection>();
 
-  SseHandler(this._uri);
+  StreamQueue<SseConnection> _connectionsStream;
 
+  SseHandler(this._uri);
   StreamQueue<SseConnection> get connections =>
-      StreamQueue(_connectionController.stream);
+      _connectionsStream ??= StreamQueue(_connectionController.stream);
 
   shelf.Handler get handler => _handle;
 
   int get numberOfClients => _connections.length;
 
+  void close() {
+    if (!_connectionController.isClosed) _connectionController.close();
+    for (var connection in _connections) {
+      connection.sink.close();
+      _connections.remove(connection);
+    }
+  }
+
   shelf.Response _createSseConnection(shelf.Request req, String path) {
     req.hijack((channel) async {
       var sink = utf8.encoder.startChunkedConversion(channel.sink);
@@ -84,14 +101,12 @@
       var clientId = req.url.queryParameters['sseClientId'];
       var connection = SseConnection(sink, clientId);
       _connections.add(connection);
-      unawaited(connection.onClose.then((_) {
+      // Remove connection when it is remotely closed.
+      channel.stream.listen((_) {}, onDone: () {
+        connection.sink.close();
         _connections.remove(connection);
-      }));
-      channel.stream.listen((_) {
-        // SSE is unidirectional. Responses are handled through POST requests.
-      }, onDone: () {
-        connection.close();
       });
+
       _connectionController.add(connection);
     });
     return null;
diff --git a/pubspec.yaml b/pubspec.yaml
index a86543f..e9dfe3d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -14,7 +14,6 @@
   async: ^2.0.8
   http: ^0.12.0+1
   logging: ^0.11.3+2
-  pedantic: ^1.4.0
   stream_channel: ^1.6.8
   shelf: ^0.7.4
   uuid: ^1.0.3
diff --git a/test/sse_test.dart b/test/sse_test.dart
index d6e0445..a9db7d9 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -55,12 +55,12 @@
     var connections = handler.connections;
     await webdriver.get('http://localhost:${server.port}');
     var connectionA = await connections.next;
+    connectionA.sink.add('foo');
+    expect(await connectionA.stream.first, 'foo');
+
     await webdriver.get('http://localhost:${server.port}');
     var connectionB = await connections.next;
-
-    connectionA.sink.add('foo');
     connectionB.sink.add('bar');
-    await connectionA.onClose;
     expect(await connectionB.stream.first, 'bar');
   });
 
@@ -69,8 +69,8 @@
     await webdriver.get('http://localhost:${server.port}');
     var connection = await handler.connections.next;
     expect(handler.numberOfClients, 1);
-    connection.close();
-    await connection.onClose;
+    await connection.sink.close();
+    await pumpEventQueue();
     expect(handler.numberOfClients, 0);
   });
 
@@ -83,7 +83,8 @@
     var closeButton = await webdriver.findElement(const By.tagName('button'));
     await closeButton.click();
 
-    await connection.onClose;
+    // Stream should complete.
+    await connection.stream.toList();
     expect(handler.numberOfClients, 0);
   });