Add onActiveStateChanged callback to Connection. (dart-lang/http2#13)
The callback is invoked when the connection goes from idle (0 active streams) to active (at least 1 active stream), and when the connection goes from active to idle.
This can be used to implement an idle connection timeout.
diff --git a/pkgs/http2/CHANGELOG.md b/pkgs/http2/CHANGELOG.md
index fad6f74..e4b49a5 100644
--- a/pkgs/http2/CHANGELOG.md
+++ b/pkgs/http2/CHANGELOG.md
@@ -1,5 +1,11 @@
# Changelog
+## 0.1.4
+
+* Added an `onActiveStateChanged` callback to `Connection`, which is invoked when
+ the connection changes state from idle to active or from active to idle. This
+ can be used to implement an idle connection timeout.
+
## 0.1.3
* Fixed a bug where a closed window would not open correctly due to an increase
diff --git a/pkgs/http2/lib/src/connection.dart b/pkgs/http2/lib/src/connection.dart
index dad5aea..5ff58c5 100644
--- a/pkgs/http2/lib/src/connection.dart
+++ b/pkgs/http2/lib/src/connection.dart
@@ -72,6 +72,9 @@
/// Whether this connection is a client connection.
final bool isClientConnection;
+ /// Active state handler for this connection.
+ void Function(bool isActive) onActiveStateChanged;
+
/// The HPack context for this connection.
final HPackContext _hpackContext = new HPackContext();
@@ -188,11 +191,13 @@
if (isClientConnection) {
_streams = new StreamHandler.client(
_frameWriter, _incomingQueue, _outgoingQueue,
- _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings);
+ _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings,
+ _activeStateHandler);
} else {
_streams = new StreamHandler.server(
_frameWriter, _incomingQueue, _outgoingQueue,
- _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings);
+ _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings,
+ _activeStateHandler);
}
// NOTE: We're not waiting until initial settings have been exchanged
@@ -257,6 +262,12 @@
return _terminate(ErrorCode.NO_ERROR);
}
+ void _activeStateHandler(bool isActive) {
+ if (onActiveStateChanged != null) {
+ onActiveStateChanged(isActive);
+ }
+ }
+
/// Invokes the passed in closure and catches any exceptions.
void _catchProtocolErrors(void fn()) {
try {
diff --git a/pkgs/http2/lib/src/streams/stream_handler.dart b/pkgs/http2/lib/src/streams/stream_handler.dart
index c9ece46..ead8d07 100644
--- a/pkgs/http2/lib/src/streams/stream_handler.dart
+++ b/pkgs/http2/lib/src/streams/stream_handler.dart
@@ -149,28 +149,35 @@
bool get ranOutOfStreamIds => _ranOutOfStreamIds();
+ final void Function(bool isActive) _onActiveStateChanged;
+
StreamHandler._(this._frameWriter, this.incomingQueue, this.outgoingQueue,
this._peerSettings, this._localSettings,
- this.nextStreamId, this.lastRemoteStreamId);
+ this._onActiveStateChanged, this.nextStreamId,
+ this.lastRemoteStreamId);
- factory StreamHandler.client(FrameWriter writer,
- ConnectionMessageQueueIn incomingQueue,
- ConnectionMessageQueueOut outgoingQueue,
- ActiveSettings peerSettings,
- ActiveSettings localSettings) {
+ factory StreamHandler.client(
+ FrameWriter writer,
+ ConnectionMessageQueueIn incomingQueue,
+ ConnectionMessageQueueOut outgoingQueue,
+ ActiveSettings peerSettings,
+ ActiveSettings localSettings,
+ void Function(bool isActive) onActiveStateChanged) {
return new StreamHandler._(
writer, incomingQueue, outgoingQueue, peerSettings, localSettings,
- 1, 0);
+ onActiveStateChanged, 1, 0);
}
- factory StreamHandler.server(FrameWriter writer,
- ConnectionMessageQueueIn incomingQueue,
- ConnectionMessageQueueOut outgoingQueue,
- ActiveSettings peerSettings,
- ActiveSettings localSettings) {
+ factory StreamHandler.server(
+ FrameWriter writer,
+ ConnectionMessageQueueIn incomingQueue,
+ ConnectionMessageQueueOut outgoingQueue,
+ ActiveSettings peerSettings,
+ ActiveSettings localSettings,
+ void Function(bool isActive) onActiveStateChanged) {
return new StreamHandler._(
writer, incomingQueue, outgoingQueue, peerSettings, localSettings,
- 2, -1);
+ onActiveStateChanged, 2, -1);
}
void onTerminated(exception) {
@@ -298,6 +305,7 @@
var stream = new Http2StreamImpl(
streamQueueIn, streamQueueOut, _outgoingC, streamId, windowOutHandler,
this._canPush, this._push, this._terminateStream);
+ final wasIdle = _openStreams.isEmpty;
_openStreams[stream.id] = stream;
_setupOutgoingMessageHandling(stream);
@@ -309,6 +317,10 @@
_cleanupClosedStream(stream);
});
+ if (wasIdle) {
+ _onActiveStateChanged(true);
+ }
+
return stream;
}
@@ -707,6 +719,9 @@
if (stream.state != StreamState.Terminated) {
_changeState(stream, StreamState.Terminated);
}
+ if (_openStreams.isEmpty) {
+ _onActiveStateChanged(false);
+ }
onCheckForClose();
}
diff --git a/pkgs/http2/lib/transport.dart b/pkgs/http2/lib/transport.dart
index 7e29178..90ac383 100644
--- a/pkgs/http2/lib/transport.dart
+++ b/pkgs/http2/lib/transport.dart
@@ -144,6 +144,14 @@
/// Pings the other end.
Future ping();
+ /// Sets the active state callback.
+ ///
+ /// This callback is invoked with [true] when the number of active streams
+ /// goes from 0 to 1 (the connection goes from idle to active), and with
+ /// [false] when the number of active streams becomes 0 (the connection goes
+ /// from active to idle).
+ set onActiveStateChanged(void Function(bool isActive) callback);
+
/// Finish this connection.
///
/// No new streams will be accepted or can be created.
@@ -210,7 +218,7 @@
/// A sink for writing data and/or headers to the remote end.
StreamSink<StreamMessage> get outgoingMessages;
- /// Set the termination handler on this stream.
+ /// Sets the termination handler on this stream.
///
/// The handler will be called if the stream receives an RST_STREAM frame.
set onTerminated(void value(int));
diff --git a/pkgs/http2/pubspec.yaml b/pkgs/http2/pubspec.yaml
index 69dc9b9..3c529f6 100644
--- a/pkgs/http2/pubspec.yaml
+++ b/pkgs/http2/pubspec.yaml
@@ -1,5 +1,5 @@
name: http2
-version: 0.1.3
+version: 0.1.4
description: A HTTP/2 implementation in Dart.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/http2
diff --git a/pkgs/http2/test/transport_test.dart b/pkgs/http2/test/transport_test.dart
index 11fde21..670d2c3 100644
--- a/pkgs/http2/test/transport_test.dart
+++ b/pkgs/http2/test/transport_test.dart
@@ -207,7 +207,6 @@
transportTest('server-terminates-stream',
(ClientTransportConnection client,
ServerTransportConnection server) async {
-
Future serverFun() async {
await for (ServerTransportStream stream in server.incomingStreams) {
stream.terminate();
@@ -267,7 +266,6 @@
transportTest('server-terminates-stream-after-half-close',
(ClientTransportConnection client,
ServerTransportConnection server) async {
-
var readyForError = new Completer();
Future serverFun() async {
@@ -299,6 +297,68 @@
await Future.wait([serverFun(), clientFun()]);
});
+ transportTest('idle-handler',
+ (ClientTransportConnection client,
+ ServerTransportConnection server) async {
+ Future serverFun() async {
+ int activeCount = 0;
+ int idleCount = 0;
+ server.onActiveStateChanged = expectAsync1((active) {
+ if (active) {
+ activeCount++;
+ } else {
+ idleCount++;
+ }
+ }, count: 6);
+ await for (final stream in server.incomingStreams) {
+ stream.sendHeaders([]);
+ stream.incomingMessages.toList().then(
+ (_) => stream.outgoingMessages.close());
+ }
+ await server.finish();
+ expect(activeCount, 3);
+ expect(idleCount, 3);
+ }
+
+ Future clientFun() async {
+ int activeCount = 0;
+ int idleCount = 0;
+ client.onActiveStateChanged = expectAsync1((active) {
+ if (active) {
+ activeCount++;
+ } else {
+ idleCount++;
+ }
+ }, count: 6);
+ final streams = new List<ClientTransportStream>.generate(
+ 5, (_) => client.makeRequest([]));
+ await Future.wait(streams.map((s) => s.outgoingMessages.close()));
+ await Future.wait(streams.map((s) => s.incomingMessages.toList()));
+ // This extra await is needed to allow the idle handler to run before
+ // verifying the idleCount, because the stream cleanup runs
+ // asynchronously after the stream is closed.
+ await new Future.value();
+ expect(activeCount, 1);
+ expect(idleCount, 1);
+
+ var stream = client.makeRequest([]);
+ await stream.outgoingMessages.close();
+ await stream.incomingMessages.toList();
+ await new Future.value();
+
+ stream = client.makeRequest([]);
+ await stream.outgoingMessages.close();
+ await stream.incomingMessages.toList();
+ await new Future.value();
+
+ await client.finish();
+ expect(activeCount, 3);
+ expect(idleCount, 3);
+ }
+
+ await Future.wait([clientFun(), serverFun()]);
+ });
+
group('flow-control', () {
const int kChunkSize = 1024;
const int kNumberOfMessages = 1000;