Ensure sink.done correctly fires (#26)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0aa832a..f7b9027 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 3.4.0
+
+- Remove `onClose` from `SseConnection` and ensure the corresponding
+  `sink.close` correctly fires.
+
 ## 3.3.0
 
 - Add an `onClose` event to the `SseConnection`. This allows consumers to
diff --git a/lib/src/server/sse_handler.dart b/lib/src/server/sse_handler.dart
index e3f6cc7..c3c80c8 100644
--- a/lib/src/server/sse_handler.dart
+++ b/lib/src/server/sse_handler.dart
@@ -41,10 +41,9 @@
 
   final _closedCompleter = Completer<void>();
 
-  /// Completes when the [SseConnection] closes.
-  ///
-  /// This is guaranteed to fire unlike `this.sink.close`;
-  Future<void> get onClose => _closedCompleter.future;
+  /// Wraps the `_outgoingController.stream` to buffer events to enable keep
+  /// alive.
+  StreamQueue _outgoingStreamQueue;
 
   /// Creates an [SseConnection] for the supplied [_sink].
   ///
@@ -55,14 +54,15 @@
   /// If [keepAlive] is not supplied, the connection will be closed immediately
   /// after a disconnect.
   SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
+    _outgoingStreamQueue = StreamQueue(_outgoingController.stream);
     unawaited(_setUpListener());
     _outgoingController.onCancel = _close;
     _incomingController.onCancel = _close;
   }
 
   Future<void> _setUpListener() async {
-    var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
-    while (await outgoingStreamQueue.hasNext) {
+    while (
+        !_outgoingController.isClosed && await _outgoingStreamQueue.hasNext) {
       // If we're in a KeepAlive timeout, there's nowhere to send messages so
       // wait a short period and check again.
       if (isInKeepAlivePeriod) {
@@ -72,7 +72,7 @@
 
       // Peek the data so we don't remove it from the stream if we're unable to
       // send it.
-      final data = await outgoingStreamQueue.peek;
+      final data = await _outgoingStreamQueue.peek;
 
       // Ignore outgoing messages since the connection may have closed while
       // waiting for the keep alive.
@@ -82,7 +82,7 @@
         // JSON encode the message to escape new lines.
         _sink.add('data: ${json.encode(data)}\n');
         _sink.add('\n');
-        await outgoingStreamQueue.next; // Consume from stream if no errors.
+        await _outgoingStreamQueue.next; // Consume from stream if no errors.
       } catch (StateError) {
         if (_keepAlive == null || _closedCompleter.isCompleted) {
           rethrow;
@@ -127,7 +127,10 @@
     if (!_closedCompleter.isCompleted) {
       _closedCompleter.complete();
       _sink.close();
-      if (!_outgoingController.isClosed) _outgoingController.close();
+      if (!_outgoingController.isClosed) {
+        _outgoingStreamQueue.cancel(immediate: true);
+        _outgoingController.close();
+      }
       if (!_incomingController.isClosed) _incomingController.close();
     }
   }
diff --git a/pubspec.yaml b/pubspec.yaml
index 194277d..612c417 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: sse
-version: 3.3.0
+version: 3.4.0
 homepage: https://github.com/dart-lang/sse
 description: >-
   Provides client and server functionality for setting up bi-directional