Add StreamQueue.eventsDispatched. (#19)

This is useful for advanced transaction operations, such as "one of the
following consumers accepts the queue". We want to reliably accept the
same consumer every time, and the best way to do that is to accept the
one that consumed the most events.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 43fcf54..c49a4bd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@
 * Add `StreamQueue.cancelable()`, which allows users to easily make a
   `CancelableOperation` that can be canceled without affecting the queue.
 
+* Add `StreamQueue.eventsDispatched` which counts the number of events that have
+  been dispatched by a given queue.
+
 * Add a `subscriptionTransformer()` function to create `StreamTransformer`s that
   modify the behavior of subscriptions to a stream.
 
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index f770e16..7b73eac 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -89,6 +89,15 @@
   /// Closing operations are [cancel] and [rest].
   bool _isClosed = false;
 
+  /// The number of events dispatched by this queue.
+  ///
+  /// This counts error events. It doesn't count done events, or events
+  /// dispatched to a stream returned by [rest].
+  int get eventsDispatched => _eventsReceived - _eventQueue.length;
+
+  /// The number of events received by this queue.
+  var _eventsReceived = 0;
+
   /// Queue of events not used by a request yet.
   final QueueList<Result> _eventQueue = new QueueList();
 
@@ -420,6 +429,7 @@
   /// Called when the event source adds a new data or error event.
   /// Always calls [_updateRequests] after adding.
   void _addResult(Result result) {
+    _eventsReceived++;
     _eventQueue.add(result);
     _updateRequests();
   }
@@ -540,7 +550,7 @@
   final StreamSplitter<T> _splitter;
 
   /// Queues created using [newQueue].
-  final _queues = new Set<_TransactionStreamQueue>();
+  final _queues = new Set<StreamQueue>();
 
   /// Whether [commit] has been called.
   var _committed = false;
@@ -557,7 +567,7 @@
   /// [StreamQueue.startTransaction] was called. Its position can be committed
   /// to the parent queue using [commit].
   StreamQueue<T> newQueue() {
-    var queue = new _TransactionStreamQueue(_splitter.split());
+    var queue = new StreamQueue(_splitter.split());
     _queues.add(queue);
     return queue;
   }
@@ -582,9 +592,7 @@
 
     // Remove all events from the parent queue that were consumed by the
     // child queue.
-    var eventsConsumed = (queue as _TransactionStreamQueue)._eventsReceived -
-        queue._eventQueue.length;
-    for (var j = 0; j < eventsConsumed; j++) {
+    for (var j = 0; j < queue.eventsDispatched; j++) {
       _parent._eventQueue.removeFirst();
     }
 
@@ -628,25 +636,6 @@
   }
 }
 
-/// A [StreamQueue] that belongs to a [StreamQueueTransaction].
-class _TransactionStreamQueue<T> extends _StreamQueue<T> {
-  /// The total number of events received by this queue, including events that
-  /// haven't yet been consumed by requests.
-  ///
-  /// This is used to fast-forward the parent queue if this transaction is
-  /// accepted.
-  var _eventsReceived = 0;
-
-  _TransactionStreamQueue(Stream<T> sourceStream) : super(sourceStream);
-
-  /// Modifies [StreamQueue._addResult] to count the total number of events that
-  /// have been passed to this transaction.
-  void _addResult(Result result) {
-    _eventsReceived++;
-    super._addResult(result);
-  }
-}
-
 /// Request object that receives events when they arrive, until fulfilled.
 ///
 /// Each request that cannot be fulfilled immediately is represented by
diff --git a/pubspec.yaml b/pubspec.yaml
index 20ec733..ec62ff2 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.12.0-dev
+version: 1.12.0
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 02a7733..0aebb17 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -42,6 +42,47 @@
     });
   });
 
+  group("eventsDispatched", () {
+    test("increments after a next future completes", () async {
+      var events = new StreamQueue<int>(createStream());
+
+      expect(events.eventsDispatched, equals(0));
+      await flushMicrotasks();
+      expect(events.eventsDispatched, equals(0));
+
+      var next = events.next;
+      expect(events.eventsDispatched, equals(0));
+
+      await next;
+      expect(events.eventsDispatched, equals(1));
+
+      await events.next;
+      expect(events.eventsDispatched, equals(2));
+    });
+
+    test("increments multiple times for multi-value requests", () async {
+      var events = new StreamQueue<int>(createStream());
+      await events.take(3);
+      expect(events.eventsDispatched, equals(3));
+    });
+
+    test("increments multiple times for an accepted transaction", () async {
+      var events = new StreamQueue<int>(createStream());
+      await events.withTransaction((queue) async {
+        await queue.next;
+        await queue.next;
+        return true;
+      });
+      expect(events.eventsDispatched, equals(2));
+    });
+
+    test("doesn't increment for rest requests", () async {
+      var events = new StreamQueue<int>(createStream());
+      await events.rest.toList();
+      expect(events.eventsDispatched, equals(0));
+    });
+  });
+
   group("next operation", () {
     test("simple sequence of requests", () async {
       var events = new StreamQueue<int>(createStream());