Add StreamQueue.withTransaction().
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 051f8b2..caedd8b 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -228,9 +228,11 @@
   ///
   /// Until the transaction finishes, this queue won't emit any events.
   ///
+  /// See also [withTransaction] and [cancelable].
+  ///
   /// ```dart
   /// /// Consumes all empty lines from the beginning of [lines].
-  /// Future consumeEmptyLines(StreamQueue<String> lines) {
+  /// Future consumeEmptyLines(StreamQueue<String> lines) async {
   ///   while (await lines.hasNext) {
   ///     var transaction = lines.startTransaction();
   ///     var queue = transaction.newQueue();
@@ -238,7 +240,7 @@
   ///       transaction.reject();
   ///       return;
   ///     } else {
-  ///       transaction.accept(queue);
+  ///       transaction.commit(queue);
   ///     }
   ///   }
   /// }
@@ -251,6 +253,48 @@
     return request.transaction;
   }
 
+  /// Passes a copy of this queue to [callback], and updates this queue to match
+  /// the copy's position if [callback] returns `true`.
+  ///
+  /// This queue won't emit any events until [callback] returns. If it returns
+  /// `false`, this queue continues as though [withTransaction] hadn't been
+  /// called. If it throws an error, this updates this queue to match the copy's
+  /// position and throws the error from the returned `Future`.
+  ///
+  /// Returns the same value as [callback].
+  ///
+  /// See also [startTransaction] and [cancelable].
+  ///
+  /// ```dart
+  /// /// Consumes all empty lines from the beginning of [lines].
+  /// Future consumeEmptyLines(StreamQueue<String> lines) async {
+  ///   while (await lines.hasNext) {
+  ///     // Consume a line if it's empty, otherwise return.
+  ///     if (!await lines.withTransaction(
+  ///         (queue) async => (await queue.next).isEmpty)) {
+  ///       return;
+  ///     }
+  ///   }
+  /// }
+  /// ```
+  Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) {
+    var transaction = startTransaction();
+
+    /// Avoid async/await to ensure that [startTransaction] is called
+    /// synchronously and so ends up in the right place in the request queue.
+    var queue = transaction.newQueue();
+    return callback(queue).then((result) {
+      if (result) {
+        transaction.commit(queue);
+      } else {
+        transaction.reject();
+      }
+    }, onError: (error) {
+      transaction.commit(queue);
+      throw error;
+    });
+  }
+
   /// Cancels the underlying event source.
   ///
   /// If [immediate] is `false` (the default), the cancel operation waits until
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index d5ab74b..7ab82be 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -800,6 +800,53 @@
     });
   });
 
+  group("withTransaction operation", () {
+    StreamQueue<int> events;
+    setUp(() async {
+      events = new StreamQueue(createStream());
+      expect(await events.next, 1);
+    });
+
+    test("passes a copy of the parent queue", () async {
+      await events.withTransaction(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        expect(await queue.next, 3);
+        expect(await queue.next, 4);
+        expect(await queue.hasNext, isFalse);
+        return true;
+      }));
+    });
+
+    test("the parent queue continues from the child position if it returns "
+        "true", () async {
+      await events.withTransaction(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        return true;
+      }));
+
+      expect(await events.next, 3);
+    });
+
+    test("the parent queue continues from its original position if it returns "
+        "false", () async {
+      await events.withTransaction(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        return false;
+      }));
+
+      expect(await events.next, 2);
+    });
+
+    test("the parent queue continues from the child position if it throws", () {
+      expect(events.withTransaction(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        throw "oh no";
+      })), throwsA("oh no"));
+
+      expect(events.next, completion(3));
+    });
+  });
+
   test("all combinations sequential skip/next/take operations", () async {
     // Takes all combinations of two of next, skip and take, then ends with
     // doing rest. Each of the first rounds do 10 events of each type,