Add onActiveStateChanged callback to Connection. (dart-lang/http2#13)

The callback is invoked when the connection goes from idle (0 active streams) to active (at least 1 active stream), and when the connection goes from active to idle.

This can be used to implement an idle connection timeout.
diff --git a/pkgs/http2/CHANGELOG.md b/pkgs/http2/CHANGELOG.md
index fad6f74..e4b49a5 100644
--- a/pkgs/http2/CHANGELOG.md
+++ b/pkgs/http2/CHANGELOG.md
@@ -1,5 +1,11 @@
 # Changelog
 
+## 0.1.4
+
+* Added an `onActiveStateChanged` callback to `Connection`, which is invoked when
+  the connection changes state from idle to active or from active to idle. This
+  can be used to implement an idle connection timeout.
+
 ## 0.1.3
 
 * Fixed a bug where a closed window would not open correctly due to an increase
diff --git a/pkgs/http2/lib/src/connection.dart b/pkgs/http2/lib/src/connection.dart
index dad5aea..5ff58c5 100644
--- a/pkgs/http2/lib/src/connection.dart
+++ b/pkgs/http2/lib/src/connection.dart
@@ -72,6 +72,9 @@
   /// Whether this connection is a client connection.
   final bool isClientConnection;
 
+  /// Active state handler for this connection.
+  void Function(bool isActive) onActiveStateChanged;
+
   /// The HPack context for this connection.
   final HPackContext _hpackContext = new HPackContext();
 
@@ -188,11 +191,13 @@
     if (isClientConnection) {
       _streams = new StreamHandler.client(
           _frameWriter, _incomingQueue, _outgoingQueue,
-          _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings);
+          _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings,
+          _activeStateHandler);
     } else {
       _streams = new StreamHandler.server(
           _frameWriter, _incomingQueue, _outgoingQueue,
-          _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings);
+          _settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings,
+          _activeStateHandler);
     }
 
     // NOTE: We're not waiting until initial settings have been exchanged
@@ -257,6 +262,12 @@
     return _terminate(ErrorCode.NO_ERROR);
   }
 
+  void _activeStateHandler(bool isActive) {
+    if (onActiveStateChanged != null) {
+      onActiveStateChanged(isActive);
+    }
+  }
+
   /// Invokes the passed in closure and catches any exceptions.
   void _catchProtocolErrors(void fn()) {
     try {
diff --git a/pkgs/http2/lib/src/streams/stream_handler.dart b/pkgs/http2/lib/src/streams/stream_handler.dart
index c9ece46..ead8d07 100644
--- a/pkgs/http2/lib/src/streams/stream_handler.dart
+++ b/pkgs/http2/lib/src/streams/stream_handler.dart
@@ -149,28 +149,35 @@
 
   bool get ranOutOfStreamIds => _ranOutOfStreamIds();
 
+  final void Function(bool isActive) _onActiveStateChanged;
+
   StreamHandler._(this._frameWriter, this.incomingQueue, this.outgoingQueue,
                   this._peerSettings, this._localSettings,
-                  this.nextStreamId, this.lastRemoteStreamId);
+                  this._onActiveStateChanged, this.nextStreamId,
+                  this.lastRemoteStreamId);
 
-  factory StreamHandler.client(FrameWriter writer,
-                               ConnectionMessageQueueIn incomingQueue,
-                               ConnectionMessageQueueOut outgoingQueue,
-                               ActiveSettings peerSettings,
-                               ActiveSettings localSettings) {
+  factory StreamHandler.client(
+      FrameWriter writer,
+      ConnectionMessageQueueIn incomingQueue,
+      ConnectionMessageQueueOut outgoingQueue,
+      ActiveSettings peerSettings,
+      ActiveSettings localSettings,
+      void Function(bool isActive) onActiveStateChanged) {
     return new StreamHandler._(
         writer, incomingQueue, outgoingQueue, peerSettings, localSettings,
-        1, 0);
+        onActiveStateChanged, 1, 0);
   }
 
-  factory StreamHandler.server(FrameWriter writer,
-                               ConnectionMessageQueueIn incomingQueue,
-                               ConnectionMessageQueueOut outgoingQueue,
-                               ActiveSettings peerSettings,
-                               ActiveSettings localSettings) {
+  factory StreamHandler.server(
+      FrameWriter writer,
+      ConnectionMessageQueueIn incomingQueue,
+      ConnectionMessageQueueOut outgoingQueue,
+      ActiveSettings peerSettings,
+      ActiveSettings localSettings,
+      void Function(bool isActive) onActiveStateChanged) {
     return new StreamHandler._(
         writer, incomingQueue, outgoingQueue, peerSettings, localSettings,
-        2, -1);
+        onActiveStateChanged, 2, -1);
   }
 
   void onTerminated(exception) {
@@ -298,6 +305,7 @@
     var stream = new Http2StreamImpl(
         streamQueueIn, streamQueueOut, _outgoingC, streamId, windowOutHandler,
         this._canPush, this._push, this._terminateStream);
+    final wasIdle = _openStreams.isEmpty;
     _openStreams[stream.id] = stream;
 
     _setupOutgoingMessageHandling(stream);
@@ -309,6 +317,10 @@
       _cleanupClosedStream(stream);
     });
 
+    if (wasIdle) {
+      _onActiveStateChanged(true);
+    }
+
     return stream;
   }
 
@@ -707,6 +719,9 @@
     if (stream.state != StreamState.Terminated) {
       _changeState(stream, StreamState.Terminated);
     }
+    if (_openStreams.isEmpty) {
+      _onActiveStateChanged(false);
+    }
     onCheckForClose();
   }
 
diff --git a/pkgs/http2/lib/transport.dart b/pkgs/http2/lib/transport.dart
index 7e29178..90ac383 100644
--- a/pkgs/http2/lib/transport.dart
+++ b/pkgs/http2/lib/transport.dart
@@ -144,6 +144,14 @@
   /// Pings the other end.
   Future ping();
 
+  /// Sets the active state callback.
+  ///
+  /// This callback is invoked with [true] when the number of active streams
+  /// goes from 0 to 1 (the connection goes from idle to active), and with
+  /// [false] when the number of active streams becomes 0 (the connection goes
+  /// from active to idle).
+  set onActiveStateChanged(void Function(bool isActive) callback);
+
   /// Finish this connection.
   ///
   /// No new streams will be accepted or can be created.
@@ -210,7 +218,7 @@
   /// A sink for writing data and/or headers to the remote end.
   StreamSink<StreamMessage> get outgoingMessages;
 
-  /// Set the termination handler on this stream.
+  /// Sets the termination handler on this stream.
   ///
   /// The handler will be called if the stream receives an RST_STREAM frame.
   set onTerminated(void value(int));
diff --git a/pkgs/http2/pubspec.yaml b/pkgs/http2/pubspec.yaml
index 69dc9b9..3c529f6 100644
--- a/pkgs/http2/pubspec.yaml
+++ b/pkgs/http2/pubspec.yaml
@@ -1,5 +1,5 @@
 name: http2
-version: 0.1.3
+version: 0.1.4
 description: A HTTP/2 implementation in Dart.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/http2
diff --git a/pkgs/http2/test/transport_test.dart b/pkgs/http2/test/transport_test.dart
index 11fde21..670d2c3 100644
--- a/pkgs/http2/test/transport_test.dart
+++ b/pkgs/http2/test/transport_test.dart
@@ -207,7 +207,6 @@
     transportTest('server-terminates-stream',
         (ClientTransportConnection client,
          ServerTransportConnection server) async {
-
       Future serverFun() async {
         await for (ServerTransportStream stream in server.incomingStreams) {
           stream.terminate();
@@ -267,7 +266,6 @@
     transportTest('server-terminates-stream-after-half-close',
         (ClientTransportConnection client,
          ServerTransportConnection server) async {
-
           var readyForError = new Completer();
 
           Future serverFun() async {
@@ -299,6 +297,68 @@
           await Future.wait([serverFun(), clientFun()]);
         });
 
+    transportTest('idle-handler',
+        (ClientTransportConnection client,
+         ServerTransportConnection server) async {
+      Future serverFun() async {
+        int activeCount = 0;
+        int idleCount = 0;
+        server.onActiveStateChanged = expectAsync1((active) {
+          if (active) {
+            activeCount++;
+          } else {
+            idleCount++;
+          }
+        }, count: 6);
+        await for (final stream in server.incomingStreams) {
+          stream.sendHeaders([]);
+          stream.incomingMessages.toList().then(
+                  (_) => stream.outgoingMessages.close());
+        }
+        await server.finish();
+        expect(activeCount, 3);
+        expect(idleCount, 3);
+      }
+
+      Future clientFun() async {
+        int activeCount = 0;
+        int idleCount = 0;
+        client.onActiveStateChanged = expectAsync1((active) {
+          if (active) {
+            activeCount++;
+          } else {
+            idleCount++;
+          }
+        }, count: 6);
+        final streams = new List<ClientTransportStream>.generate(
+            5, (_) => client.makeRequest([]));
+        await Future.wait(streams.map((s) => s.outgoingMessages.close()));
+        await Future.wait(streams.map((s) => s.incomingMessages.toList()));
+        // This extra await is needed to allow the idle handler to run before
+        // verifying the idleCount, because the stream cleanup runs
+        // asynchronously after the stream is closed.
+        await new Future.value();
+        expect(activeCount, 1);
+        expect(idleCount, 1);
+
+        var stream = client.makeRequest([]);
+        await stream.outgoingMessages.close();
+        await stream.incomingMessages.toList();
+        await new Future.value();
+
+        stream = client.makeRequest([]);
+        await stream.outgoingMessages.close();
+        await stream.incomingMessages.toList();
+        await new Future.value();
+
+        await client.finish();
+        expect(activeCount, 3);
+        expect(idleCount, 3);
+      }
+
+      await Future.wait([clientFun(), serverFun()]);
+    });
+
     group('flow-control', () {
       const int kChunkSize = 1024;
       const int kNumberOfMessages = 1000;