Add StreamQueue.cancelable().
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index caedd8b..f770e16 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -7,6 +7,7 @@
 
 import 'package:collection/collection.dart';
 
+import "cancelable_operation.dart";
 import "result.dart";
 import "subscription_stream.dart";
 import "stream_completer.dart";
@@ -295,6 +296,40 @@
     });
   }
 
+  /// Passes a copy of this queue to [callback], and updates this queue to match
+  /// the copy's position once [callback] completes.
+  ///
+  /// If the returned [CancelableOperation] is canceled, this queue instead
+  /// continues as though [cancelable] hadn't been called. Otherwise, it emits
+  /// the same value or error as [callback].
+  ///
+  /// See also [startTransaction] and [withTransaction].
+  ///
+  /// ```dart
+  /// final _stdinQueue = new StreamQueue(stdin);
+  ///
+  /// /// Returns an operation that completes when the user sends a line to
+  /// /// standard input.
+  /// ///
+  /// /// If the operation is canceled, stops waiting for user input.
+  /// CancelableOperation<String> nextStdinLine() =>
+  ///     _stdinQueue.cancelable((queue) => queue.next);
+  /// ```
+  CancelableOperation/*<S>*/ cancelable/*<S>*/(
+      Future/*<S>*/ callback(StreamQueue<T> queue)) {
+    var transaction = startTransaction();
+    var completer = new CancelableCompleter/*<S>*/(onCancel: () {
+      transaction.reject();
+    });
+
+    var queue = transaction.newQueue();
+    completer.complete(callback(queue).whenComplete(() {
+      if (!completer.isCanceled) transaction.commit(queue);
+    }));
+
+    return completer.operation;
+  }
+
   /// Cancels the underlying event source.
   ///
   /// If [immediate] is `false` (the default), the cancel operation waits until
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 7ab82be..02a7733 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -847,6 +847,59 @@
     });
   });
 
+  group("cancelable operation", () {
+    StreamQueue<int> events;
+    setUp(() async {
+      events = new StreamQueue(createStream());
+      expect(await events.next, 1);
+    });
+
+    test("passes a copy of the parent queue", () async {
+      await events.cancelable(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        expect(await queue.next, 3);
+        expect(await queue.next, 4);
+        expect(await queue.hasNext, isFalse);
+      })).value;
+    });
+
+    test("the parent queue continues from the child position by default",
+        () async {
+      await events.cancelable(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+      })).value;
+
+      expect(await events.next, 3);
+    });
+
+    test("the parent queue continues from the child position if an error is "
+        "thrown", () async {
+      expect(events.cancelable(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        throw "oh no";
+      })).value, throwsA("oh no"));
+
+      expect(events.next, completion(3));
+    });
+
+    test("the parent queue continues from the original position if canceled",
+        () async {
+      var operation = events.cancelable(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+      }));
+      operation.cancel();
+
+      expect(await events.next, 2);
+    });
+
+    test("forwards the value from the callback", () async {
+      expect(await events.cancelable(expectAsync1((queue) async {
+        expect(await queue.next, 2);
+        return "value";
+      })).value, "value");
+    });
+  });
+
   test("all combinations sequential skip/next/take operations", () async {
     // Takes all combinations of two of next, skip and take, then ends with
     // doing rest. Each of the first rounds do 10 events of each type,