Add StreamQueue.withTransaction().
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 051f8b2..caedd8b 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -228,9 +228,11 @@
///
/// Until the transaction finishes, this queue won't emit any events.
///
+ /// See also [withTransaction] and [cancelable].
+ ///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
- /// Future consumeEmptyLines(StreamQueue<String> lines) {
+ /// Future consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// var transaction = lines.startTransaction();
/// var queue = transaction.newQueue();
@@ -238,7 +240,7 @@
/// transaction.reject();
/// return;
/// } else {
- /// transaction.accept(queue);
+ /// transaction.commit(queue);
/// }
/// }
/// }
@@ -251,6 +253,48 @@
return request.transaction;
}
+ /// Passes a copy of this queue to [callback], and updates this queue to match
+ /// the copy's position if [callback] returns `true`.
+ ///
+ /// This queue won't emit any events until [callback] returns. If it returns
+ /// `false`, this queue continues as though [withTransaction] hadn't been
+ /// called. If it throws an error, this updates this queue to match the copy's
+ /// position and throws the error from the returned `Future`.
+ ///
+ /// Returns the same value as [callback].
+ ///
+ /// See also [startTransaction] and [cancelable].
+ ///
+ /// ```dart
+ /// /// Consumes all empty lines from the beginning of [lines].
+ /// Future consumeEmptyLines(StreamQueue<String> lines) async {
+ /// while (await lines.hasNext) {
+ /// // Consume a line if it's empty, otherwise return.
+ /// if (!await lines.withTransaction(
+ /// (queue) async => (await queue.next).isEmpty)) {
+ /// return;
+ /// }
+ /// }
+ /// }
+ /// ```
+ Future<bool> withTransaction(Future<bool> callback(StreamQueue<T> queue)) {
+ var transaction = startTransaction();
+
+ /// Avoid async/await to ensure that [startTransaction] is called
+ /// synchronously and so ends up in the right place in the request queue.
+ var queue = transaction.newQueue();
+ return callback(queue).then((result) {
+ if (result) {
+ transaction.commit(queue);
+ } else {
+ transaction.reject();
+ }
+ }, onError: (error) {
+ transaction.commit(queue);
+ throw error;
+ });
+ }
+
/// Cancels the underlying event source.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index d5ab74b..7ab82be 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -800,6 +800,53 @@
});
});
+ group("withTransaction operation", () {
+ StreamQueue<int> events;
+ setUp(() async {
+ events = new StreamQueue(createStream());
+ expect(await events.next, 1);
+ });
+
+ test("passes a copy of the parent queue", () async {
+ await events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ expect(await queue.next, 3);
+ expect(await queue.next, 4);
+ expect(await queue.hasNext, isFalse);
+ return true;
+ }));
+ });
+
+ test("the parent queue continues from the child position if it returns "
+ "true", () async {
+ await events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ return true;
+ }));
+
+ expect(await events.next, 3);
+ });
+
+ test("the parent queue continues from its original position if it returns "
+ "false", () async {
+ await events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ return false;
+ }));
+
+ expect(await events.next, 2);
+ });
+
+ test("the parent queue continues from the child position if it throws", () {
+ expect(events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ throw "oh no";
+ })), throwsA("oh no"));
+
+ expect(events.next, completion(3));
+ });
+ });
+
test("all combinations sequential skip/next/take operations", () async {
// Takes all combinations of two of next, skip and take, then ends with
// doing rest. Each of the first rounds do 10 events of each type,