Support shutting down the handler ignoring any keepAlive periods (#27)

* Support shutting down the handler ignoring any keepAlive periods
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f7b9027..ddcd721 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 3.5.0
+
+- Add new `shutdown` methods on `SseHandler` and `SseConnection` to allow closing
+  connections immediately, ignoring any keep-alive periods.
+
 ## 3.4.0
 
 - Remove `onClose` from `SseConnection` and ensure the corresponding
diff --git a/lib/src/server/sse_handler.dart b/lib/src/server/sse_handler.dart
index c3c80c8..923b87f 100644
--- a/lib/src/server/sse_handler.dart
+++ b/lib/src/server/sse_handler.dart
@@ -115,10 +115,11 @@
     if (_keepAlive == null) {
       // Close immediately if we're not keeping alive.
       _close();
-    } else if (!isInKeepAlivePeriod) {
-      // Otherwise if we didn't already have an active timer, set a timer to
-      // close after the timeout period. If the connection comes back, this will
-      // be cancelled and all messages left in the queue tried again.
+    } else if (!isInKeepAlivePeriod && !_closedCompleter.isCompleted) {
+      // Otherwise if we didn't already have an active timer and we've not already
+      // been completely closed, set a timer to close after the timeout period.
+      // If the connection comes back, this will be cancelled and all messages left
+      // in the queue tried again.
       _keepAliveTimer = Timer(_keepAlive, _close);
     }
   }
@@ -126,6 +127,9 @@
   void _close() {
     if (!_closedCompleter.isCompleted) {
       _closedCompleter.complete();
+      // Cancel any existing timer in case we were told to explicitly shut down
+      // to avoid keeping the process alive.
+      _keepAliveTimer?.cancel();
       _sink.close();
       if (!_outgoingController.isClosed) {
         _outgoingStreamQueue.cancel(immediate: true);
@@ -134,6 +138,11 @@
       if (!_incomingController.isClosed) _incomingController.close();
     }
   }
+
+  /// Immediately close the connection, ignoring any keepAlive period.
+  void shutdown() {
+    _close();
+  }
 }
 
 /// [SseHandler] handles requests on a user defined path to create
@@ -228,6 +237,13 @@
       // Firefox does not set header "origin".
       // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
       req.headers['origin'] ?? req.headers['host'];
+
+  /// Immediately close all connections, ignoring any keepAlive periods.
+  void shutdown() {
+    for (final connection in _connections.values) {
+      connection.shutdown();
+    }
+  }
 }
 
 void closeSink(SseConnection connection) => connection._sink.close();
diff --git a/pubspec.yaml b/pubspec.yaml
index 612c417..76f3d97 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: sse
-version: 3.4.0
+version: 3.5.0
 homepage: https://github.com/dart-lang/sse
 description: >-
   Provides client and server functionality for setting up bi-directional
diff --git a/test/sse_test.dart b/test/sse_test.dart
index 3a7f76b..e7263f1 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -231,6 +231,26 @@
       // Ensure messages arrive in the same order
       expect(await connection.stream.take(2).toList(), equals(['one', 'two']));
     });
+
+    test('Explicit shutdown does not wait for keepAlive', () async {
+      expect(handler.numberOfClients, 0);
+      await webdriver.get('http://localhost:${server.port}');
+      await handler.connections.next;
+      expect(handler.numberOfClients, 1);
+
+      // Close the underlying connection.
+      handler.shutdown();
+
+      // Wait for a short period to allow the connection to close, but not
+      // long enough that the 30second keep-alive may have expired.
+      var maxPumps = 50;
+      while (handler.numberOfClients > 0 && maxPumps-- > 0) {
+        await pumpEventQueue(times: 1);
+      }
+
+      // Ensure there are not connected clients.
+      expect(handler.numberOfClients, 0);
+    });
   }, timeout: const Timeout(Duration(seconds: 120)));
 }