Fix a StreamQueue bug (#44)

Previously, StreamQueueTransaction assumed that its request was the
oldest request in the queue at the time at which it was committed or
rejected. This assumption wasn't always correct, and this change
avoids making it.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index da57bb6..03fff02 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.0.3
+
+* Fix a bug in `StreamQueue.startTransaction()` and related methods when
+  rejecting a transaction that isn't the oldest request in the queue.
+
 ## 2.0.2
 
 * Add support for Dart 2.0 library changes to class `Timer`. 
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 1318114..d9be69d 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -400,7 +400,7 @@
 
   // ------------------------------------------------------------------
   // Methods that may be called from the request implementations to
-  // control the even stream.
+  // control the event stream.
 
   /// Matches events with requests.
   ///
@@ -643,11 +643,13 @@
     for (var queue in _queues) {
       queue._cancel();
     }
-
-    assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
-        this);
-    _parent._requestQueue.removeFirst();
-    _parent._updateRequests();
+    // If this is the active request in the queue, mark it as finished.
+    var currentRequest = _parent._requestQueue.first;
+    if (currentRequest is _TransactionRequest &&
+        currentRequest.transaction == this) {
+      _parent._requestQueue.removeFirst();
+      _parent._updateRequests();
+    }
   }
 
   /// Throws a [StateError] if [accept] or [reject] has already been called.
@@ -975,6 +977,6 @@
       events[_eventsSent++].addTo(_controller);
     }
     if (isDone && !_controller.isClosed) _controller.close();
-    return false;
+    return transaction._committed || _transaction._rejected;
   }
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index bfc4171..3fd3bbb 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.0.2
+version: 2.0.3
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 2e4805b..8bd9d23 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -904,9 +904,27 @@
         expect(transaction.reject, throwsStateError);
         expect(() => transaction.commit(queue1), throwsStateError);
       });
+
+      test("before the transaction emits any events, does nothing", () async {
+        var controller = new StreamController();
+        var events = new StreamQueue(controller.stream);
+
+        // Queue a request before the transaction, but don't let it complete
+        // until we're done with the transaction.
+        expect(events.next, completion(equals(1)));
+        events.startTransaction().reject();
+        expect(events.next, completion(equals(2)));
+
+        await flushMicrotasks();
+        controller.add(1);
+        await flushMicrotasks();
+        controller.add(2);
+        await flushMicrotasks();
+        controller.close();
+      });
     });
 
-    group("when committed,", () {
+    group("when committed", () {
       test("further original requests use the committed state", () async {
         expect(await queue1.next, 2);
         await flushMicrotasks();
@@ -963,6 +981,25 @@
         expect(transaction.reject, throwsStateError);
         expect(() => transaction.commit(queue1), throwsStateError);
       });
+
+      test("before the transaction emits any events, does nothing", () async {
+        var controller = new StreamController();
+        var events = new StreamQueue(controller.stream);
+
+        // Queue a request before the transaction, but don't let it complete
+        // until we're done with the transaction.
+        expect(events.next, completion(equals(1)));
+        var transaction = events.startTransaction();
+        transaction.commit(transaction.newQueue());
+        expect(events.next, completion(equals(2)));
+
+        await flushMicrotasks();
+        controller.add(1);
+        await flushMicrotasks();
+        controller.add(2);
+        await flushMicrotasks();
+        controller.close();
+      });
     });
   });