Ensure sink.done correctly fires (#26)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0aa832a..f7b9027 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 3.4.0
+
+- Remove `onClose` from `SseConnection` and ensure the corresponding
+ `sink.close` correctly fires.
+
## 3.3.0
- Add an `onClose` event to the `SseConnection`. This allows consumers to
diff --git a/lib/src/server/sse_handler.dart b/lib/src/server/sse_handler.dart
index e3f6cc7..c3c80c8 100644
--- a/lib/src/server/sse_handler.dart
+++ b/lib/src/server/sse_handler.dart
@@ -41,10 +41,9 @@
final _closedCompleter = Completer<void>();
- /// Completes when the [SseConnection] closes.
- ///
- /// This is guaranteed to fire unlike `this.sink.close`;
- Future<void> get onClose => _closedCompleter.future;
+ /// Wraps the `_outgoingController.stream` to buffer events to enable keep
+ /// alive.
+ StreamQueue _outgoingStreamQueue;
/// Creates an [SseConnection] for the supplied [_sink].
///
@@ -55,14 +54,15 @@
/// If [keepAlive] is not supplied, the connection will be closed immediately
/// after a disconnect.
SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
+ _outgoingStreamQueue = StreamQueue(_outgoingController.stream);
unawaited(_setUpListener());
_outgoingController.onCancel = _close;
_incomingController.onCancel = _close;
}
Future<void> _setUpListener() async {
- var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
- while (await outgoingStreamQueue.hasNext) {
+ while (
+ !_outgoingController.isClosed && await _outgoingStreamQueue.hasNext) {
// If we're in a KeepAlive timeout, there's nowhere to send messages so
// wait a short period and check again.
if (isInKeepAlivePeriod) {
@@ -72,7 +72,7 @@
// Peek the data so we don't remove it from the stream if we're unable to
// send it.
- final data = await outgoingStreamQueue.peek;
+ final data = await _outgoingStreamQueue.peek;
// Ignore outgoing messages since the connection may have closed while
// waiting for the keep alive.
@@ -82,7 +82,7 @@
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
- await outgoingStreamQueue.next; // Consume from stream if no errors.
+ await _outgoingStreamQueue.next; // Consume from stream if no errors.
} catch (StateError) {
if (_keepAlive == null || _closedCompleter.isCompleted) {
rethrow;
@@ -127,7 +127,10 @@
if (!_closedCompleter.isCompleted) {
_closedCompleter.complete();
_sink.close();
- if (!_outgoingController.isClosed) _outgoingController.close();
+ if (!_outgoingController.isClosed) {
+ _outgoingStreamQueue.cancel(immediate: true);
+ _outgoingController.close();
+ }
if (!_incomingController.isClosed) _incomingController.close();
}
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 194277d..612c417 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: sse
-version: 3.3.0
+version: 3.4.0
homepage: https://github.com/dart-lang/sse
description: >-
Provides client and server functionality for setting up bi-directional