| // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE filevents. |
| |
| import "dart:async"; |
| |
| import "package:async/async.dart"; |
| import "package:test/test.dart"; |
| |
| import "utils.dart"; |
| |
| main() { |
| group("source stream", () { |
| test("is listened to on first request, paused between requests", () async { |
| var controller = new StreamController<int>(); |
| var events = new StreamQueue<int>(controller.stream); |
| await flushMicrotasks(); |
| expect(controller.hasListener, isFalse); |
| |
| var next = events.next; |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isFalse); |
| |
| controller.add(1); |
| |
| expect(await next, 1); |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isTrue); |
| |
| next = events.next; |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isFalse); |
| |
| controller.add(2); |
| |
| expect(await next, 2); |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isTrue); |
| |
| events.cancel(); |
| expect(controller.hasListener, isFalse); |
| }); |
| }); |
| |
| group("eventsDispatched", () { |
| test("increments after a next future completes", () async { |
| var events = new StreamQueue<int>(createStream()); |
| |
| expect(events.eventsDispatched, equals(0)); |
| await flushMicrotasks(); |
| expect(events.eventsDispatched, equals(0)); |
| |
| var next = events.next; |
| expect(events.eventsDispatched, equals(0)); |
| |
| await next; |
| expect(events.eventsDispatched, equals(1)); |
| |
| await events.next; |
| expect(events.eventsDispatched, equals(2)); |
| }); |
| |
| test("increments multiple times for multi-value requests", () async { |
| var events = new StreamQueue<int>(createStream()); |
| await events.take(3); |
| expect(events.eventsDispatched, equals(3)); |
| }); |
| |
| test("increments multiple times for an accepted transaction", () async { |
| var events = new StreamQueue<int>(createStream()); |
| await events.withTransaction((queue) async { |
| await queue.next; |
| await queue.next; |
| return true; |
| }); |
| expect(events.eventsDispatched, equals(2)); |
| }); |
| |
| test("doesn't increment for rest requests", () async { |
| var events = new StreamQueue<int>(createStream()); |
| await events.rest.toList(); |
| expect(events.eventsDispatched, equals(0)); |
| }); |
| }); |
| |
| group("lookAhead operation", () { |
| test("as simple list of events", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.lookAhead(4), [1, 2, 3, 4]); |
| expect(await events.next, 1); |
| expect(await events.lookAhead(2), [2, 3]); |
| expect(await events.take(2), [2, 3]); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| |
| test("of 0 events", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(events.lookAhead(0), completion([])); |
| expect(events.next, completion(1)); |
| expect(events.lookAhead(0), completion([])); |
| expect(events.next, completion(2)); |
| expect(events.lookAhead(0), completion([])); |
| expect(events.next, completion(3)); |
| expect(events.lookAhead(0), completion([])); |
| expect(events.next, completion(4)); |
| expect(events.lookAhead(0), completion([])); |
| expect(events.lookAhead(5), completion([])); |
| expect(events.next, throwsStateError); |
| await events.cancel(); |
| }); |
| |
| test("with bad arguments throws", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(() => events.lookAhead(-1), throwsArgumentError); |
| expect(await events.next, 1); // Did not consume event. |
| expect(() => events.lookAhead(-1), throwsArgumentError); |
| expect(await events.next, 2); // Did not consume event. |
| await events.cancel(); |
| }); |
| |
| test("of too many arguments", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.lookAhead(6), [1, 2, 3, 4]); |
| await events.cancel(); |
| }); |
| |
| test("too large later", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.lookAhead(6), [3, 4]); |
| await events.cancel(); |
| }); |
| |
| test("error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(events.lookAhead(4), throwsA("To err is divine!")); |
| expect(events.take(4), throwsA("To err is divine!")); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| }); |
| |
| group("next operation", () { |
| test("simple sequence of requests", () async { |
| var events = new StreamQueue<int>(createStream()); |
| for (int i = 1; i <= 4; i++) { |
| expect(await events.next, i); |
| } |
| expect(events.next, throwsStateError); |
| }); |
| |
| test("multiple requests at the same time", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var result = await Future |
| .wait([events.next, events.next, events.next, events.next]); |
| expect(result, [1, 2, 3, 4]); |
| await events.cancel(); |
| }); |
| |
| test("sequence of requests with error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(events.next, throwsA("To err is divine!")); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| }); |
| |
| group("skip operation", () { |
| test("of two elements in the middle of sequence", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.skip(2), 0); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| |
| test("with negative/bad arguments throws", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(() => events.skip(-1), throwsArgumentError); |
| // A non-int throws either a type error or an argument error, |
| // depending on whether it's checked mode or not. |
| expect(await events.next, 1); // Did not consume event. |
| expect(() => events.skip(-1), throwsArgumentError); |
| expect(await events.next, 2); // Did not consume event. |
| await events.cancel(); |
| }); |
| |
| test("of 0 elements works", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(events.skip(0), completion(0)); |
| expect(events.next, completion(1)); |
| expect(events.skip(0), completion(0)); |
| expect(events.next, completion(2)); |
| expect(events.skip(0), completion(0)); |
| expect(events.next, completion(3)); |
| expect(events.skip(0), completion(0)); |
| expect(events.next, completion(4)); |
| expect(events.skip(0), completion(0)); |
| expect(events.skip(5), completion(5)); |
| expect(events.next, throwsStateError); |
| await events.cancel(); |
| }); |
| |
| test("of too many events ends at stream start", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.skip(6), 2); |
| await events.cancel(); |
| }); |
| |
| test("of too many events after some events", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.skip(6), 4); |
| await events.cancel(); |
| }); |
| |
| test("of too many events ends at stream end", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| expect(await events.next, 4); |
| expect(await events.skip(2), 2); |
| await events.cancel(); |
| }); |
| |
| test("of events with error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(events.skip(4), throwsA("To err is divine!")); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| |
| test("of events with error, and skip again after", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(events.skip(4), throwsA("To err is divine!")); |
| expect(events.skip(2), completion(1)); |
| await events.cancel(); |
| }); |
| test("multiple skips at same time complete in order.", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var skip1 = events.skip(1); |
| var skip2 = events.skip(0); |
| var skip3 = events.skip(4); |
| var skip4 = events.skip(1); |
| var index = 0; |
| // Check that futures complete in order. |
| Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) { |
| expect(value, expectedValue); |
| expect(index, sequenceIndex); |
| index++; |
| }; |
| await Future.wait([ |
| skip1.then(sequence(0, 0)), |
| skip2.then(sequence(0, 1)), |
| skip3.then(sequence(1, 2)), |
| skip4.then(sequence(1, 3)) |
| ]); |
| await events.cancel(); |
| }); |
| }); |
| |
| group("take operation", () { |
| test("as simple take of events", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.take(2), [2, 3]); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| |
| test("of 0 events", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(events.take(0), completion([])); |
| expect(events.next, completion(1)); |
| expect(events.take(0), completion([])); |
| expect(events.next, completion(2)); |
| expect(events.take(0), completion([])); |
| expect(events.next, completion(3)); |
| expect(events.take(0), completion([])); |
| expect(events.next, completion(4)); |
| expect(events.take(0), completion([])); |
| expect(events.take(5), completion([])); |
| expect(events.next, throwsStateError); |
| await events.cancel(); |
| }); |
| |
| test("with bad arguments throws", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(() => events.take(-1), throwsArgumentError); |
| expect(await events.next, 1); // Did not consume event. |
| expect(() => events.take(-1), throwsArgumentError); |
| expect(await events.next, 2); // Did not consume event. |
| await events.cancel(); |
| }); |
| |
| test("of too many arguments", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.take(6), [1, 2, 3, 4]); |
| await events.cancel(); |
| }); |
| |
| test("too large later", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.take(6), [3, 4]); |
| await events.cancel(); |
| }); |
| |
| test("error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(events.take(4), throwsA("To err is divine!")); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| }); |
| |
| group("rest operation", () { |
| test("after single next", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.rest.toList(), [2, 3, 4]); |
| }); |
| |
| test("at start", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.rest.toList(), [1, 2, 3, 4]); |
| }); |
| |
| test("at end", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| expect(await events.next, 4); |
| expect(await events.rest.toList(), isEmpty); |
| }); |
| |
| test("after end", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| expect(await events.next, 4); |
| expect(events.next, throwsStateError); |
| expect(await events.rest.toList(), isEmpty); |
| }); |
| |
| test("after receiving done requested before", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var next1 = events.next; |
| var next2 = events.next; |
| var next3 = events.next; |
| var rest = events.rest; |
| for (int i = 0; i < 10; i++) { |
| await flushMicrotasks(); |
| } |
| expect(await next1, 1); |
| expect(await next2, 2); |
| expect(await next3, 3); |
| expect(await rest.toList(), [4]); |
| }); |
| |
| test("with an error event error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(await events.next, 1); |
| var rest = events.rest; |
| var events2 = new StreamQueue(rest); |
| expect(await events2.next, 2); |
| expect(events2.next, throwsA("To err is divine!")); |
| expect(await events2.next, 4); |
| }); |
| |
| test("closes the events, prevents other operations", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var stream = events.rest; |
| expect(() => events.next, throwsStateError); |
| expect(() => events.skip(1), throwsStateError); |
| expect(() => events.take(1), throwsStateError); |
| expect(() => events.rest, throwsStateError); |
| expect(() => events.cancel(), throwsStateError); |
| expect(stream.toList(), completion([1, 2, 3, 4])); |
| }); |
| |
| test("forwards to underlying stream", () async { |
| var cancel = new Completer(); |
| var controller = new StreamController<int>(onCancel: () => cancel.future); |
| var events = new StreamQueue<int>(controller.stream); |
| expect(controller.hasListener, isFalse); |
| var next = events.next; |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isFalse); |
| |
| controller.add(1); |
| expect(await next, 1); |
| expect(controller.isPaused, isTrue); |
| |
| var rest = events.rest; |
| var subscription = rest.listen(null); |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isFalse); |
| |
| dynamic lastEvent; |
| subscription.onData((value) => lastEvent = value); |
| |
| controller.add(2); |
| |
| await flushMicrotasks(); |
| expect(lastEvent, 2); |
| expect(controller.hasListener, isTrue); |
| expect(controller.isPaused, isFalse); |
| |
| subscription.pause(); |
| expect(controller.isPaused, isTrue); |
| |
| controller.add(3); |
| |
| await flushMicrotasks(); |
| expect(lastEvent, 2); |
| subscription.resume(); |
| |
| await flushMicrotasks(); |
| expect(lastEvent, 3); |
| |
| var cancelFuture = subscription.cancel(); |
| expect(controller.hasListener, isFalse); |
| cancel.complete(42); |
| expect(cancelFuture, completion(42)); |
| }); |
| }); |
| |
| group("peek operation", () { |
| test("peeks one event", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.peek, 1); |
| expect(await events.next, 1); |
| expect(await events.peek, 2); |
| expect(await events.take(2), [2, 3]); |
| expect(await events.peek, 4); |
| expect(await events.next, 4); |
| // Throws at end. |
| expect(events.peek, throwsA(anything)); |
| await events.cancel(); |
| }); |
| test("multiple requests at the same time", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var result = await Future.wait( |
| [events.peek, events.peek, events.next, events.peek, events.peek]); |
| expect(result, [1, 1, 1, 2, 2]); |
| await events.cancel(); |
| }); |
| test("sequence of requests with error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(events.peek, throwsA("To err is divine!")); |
| // Error stays in queue. |
| expect(events.peek, throwsA("To err is divine!")); |
| expect(events.next, throwsA("To err is divine!")); |
| expect(await events.next, 4); |
| await events.cancel(); |
| }); |
| }); |
| |
| group("cancel operation", () { |
| test("closes the events, prevents any other operation", () async { |
| var events = new StreamQueue<int>(createStream()); |
| await events.cancel(); |
| expect(() => events.lookAhead(1), throwsStateError); |
| expect(() => events.next, throwsStateError); |
| expect(() => events.peek, throwsStateError); |
| expect(() => events.skip(1), throwsStateError); |
| expect(() => events.take(1), throwsStateError); |
| expect(() => events.rest, throwsStateError); |
| expect(() => events.cancel(), throwsStateError); |
| }); |
| |
| test("cancels underlying subscription when called before any event", |
| () async { |
| var cancelFuture = new Future.value(42); |
| var controller = new StreamController<int>(onCancel: () => cancelFuture); |
| var events = new StreamQueue<int>(controller.stream); |
| expect(await events.cancel(), 42); |
| }); |
| |
| test("cancels underlying subscription, returns result", () async { |
| var cancelFuture = new Future.value(42); |
| var controller = new StreamController<int>(onCancel: () => cancelFuture); |
| var events = new StreamQueue<int>(controller.stream); |
| controller.add(1); |
| expect(await events.next, 1); |
| expect(await events.cancel(), 42); |
| }); |
| |
| group("with immediate: true", () { |
| test("closes the events, prevents any other operation", () async { |
| var events = new StreamQueue<int>(createStream()); |
| await events.cancel(immediate: true); |
| expect(() => events.next, throwsStateError); |
| expect(() => events.skip(1), throwsStateError); |
| expect(() => events.take(1), throwsStateError); |
| expect(() => events.rest, throwsStateError); |
| expect(() => events.cancel(), throwsStateError); |
| }); |
| |
| test("cancels the underlying subscription immediately", () async { |
| var controller = new StreamController<int>(); |
| controller.add(1); |
| |
| var events = new StreamQueue<int>(controller.stream); |
| expect(await events.next, 1); |
| expect(controller.hasListener, isTrue); |
| |
| events.cancel(immediate: true); |
| await expect(controller.hasListener, isFalse); |
| }); |
| |
| test("cancels the underlying subscription when called before any event", |
| () async { |
| var cancelFuture = new Future.value(42); |
| var controller = |
| new StreamController<int>(onCancel: () => cancelFuture); |
| |
| var events = new StreamQueue<int>(controller.stream); |
| expect(await events.cancel(immediate: true), 42); |
| }); |
| |
| test("closes pending requests", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(events.next, throwsStateError); |
| expect(events.hasNext, completion(isFalse)); |
| |
| await events.cancel(immediate: true); |
| }); |
| |
| test("returns the result of closing the underlying subscription", |
| () async { |
| var controller = |
| new StreamController<int>(onCancel: () => new Future.value(42)); |
| var events = new StreamQueue<int>(controller.stream); |
| expect(await events.cancel(immediate: true), 42); |
| }); |
| |
| test("listens and then cancels a stream that hasn't been listened to yet", |
| () async { |
| var wasListened = false; |
| var controller = |
| new StreamController<int>(onListen: () => wasListened = true); |
| var events = new StreamQueue<int>(controller.stream); |
| expect(wasListened, isFalse); |
| expect(controller.hasListener, isFalse); |
| |
| await events.cancel(immediate: true); |
| expect(wasListened, isTrue); |
| expect(controller.hasListener, isFalse); |
| }); |
| }); |
| }); |
| |
| group("hasNext operation", () { |
| test("true at start", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.hasNext, isTrue); |
| }); |
| |
| test("true after start", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, isTrue); |
| }); |
| |
| test("true at end", () async { |
| var events = new StreamQueue<int>(createStream()); |
| for (int i = 1; i <= 4; i++) { |
| expect(await events.next, i); |
| } |
| expect(await events.hasNext, isFalse); |
| }); |
| |
| test("true when enqueued", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var values = <int>[]; |
| for (int i = 1; i <= 3; i++) { |
| events.next.then(values.add); |
| } |
| expect(values, isEmpty); |
| expect(await events.hasNext, isTrue); |
| expect(values, [1, 2, 3]); |
| }); |
| |
| test("false when enqueued", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var values = <int>[]; |
| for (int i = 1; i <= 4; i++) { |
| events.next.then(values.add); |
| } |
| expect(values, isEmpty); |
| expect(await events.hasNext, isFalse); |
| expect(values, [1, 2, 3, 4]); |
| }); |
| |
| test("true when data event", () async { |
| var controller = new StreamController<int>(); |
| var events = new StreamQueue<int>(controller.stream); |
| |
| bool hasNext; |
| events.hasNext.then((result) { |
| hasNext = result; |
| }); |
| await flushMicrotasks(); |
| expect(hasNext, isNull); |
| controller.add(42); |
| expect(hasNext, isNull); |
| await flushMicrotasks(); |
| expect(hasNext, isTrue); |
| }); |
| |
| test("true when error event", () async { |
| var controller = new StreamController<int>(); |
| var events = new StreamQueue<int>(controller.stream); |
| |
| bool hasNext; |
| events.hasNext.then((result) { |
| hasNext = result; |
| }); |
| await flushMicrotasks(); |
| expect(hasNext, isNull); |
| controller.addError("BAD"); |
| expect(hasNext, isNull); |
| await flushMicrotasks(); |
| expect(hasNext, isTrue); |
| expect(events.next, throwsA("BAD")); |
| }); |
| |
| test("- hasNext after hasNext", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.hasNext, true); |
| expect(await events.hasNext, true); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.hasNext, true); |
| expect(await events.next, 2); |
| expect(await events.hasNext, true); |
| expect(await events.hasNext, true); |
| expect(await events.next, 3); |
| expect(await events.hasNext, true); |
| expect(await events.hasNext, true); |
| expect(await events.next, 4); |
| expect(await events.hasNext, false); |
| expect(await events.hasNext, false); |
| }); |
| |
| test("- next after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| }); |
| |
| test("- next after true, enqueued", () async { |
| var events = new StreamQueue<int>(createStream()); |
| var responses = <Object>[]; |
| events.next.then(responses.add); |
| events.hasNext.then(responses.add); |
| events.next.then(responses.add); |
| do { |
| await flushMicrotasks(); |
| } while (responses.length < 3); |
| expect(responses, [1, true, 2]); |
| }); |
| |
| test("- skip 0 after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.skip(0), 0); |
| expect(await events.next, 2); |
| }); |
| |
| test("- skip 1 after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.skip(1), 0); |
| expect(await events.next, 3); |
| }); |
| |
| test("- skip 2 after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.skip(2), 0); |
| expect(await events.next, 4); |
| }); |
| |
| test("- take 0 after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.take(0), isEmpty); |
| expect(await events.next, 2); |
| }); |
| |
| test("- take 1 after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.take(1), [2]); |
| expect(await events.next, 3); |
| }); |
| |
| test("- take 2 after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| expect(await events.take(2), [2, 3]); |
| expect(await events.next, 4); |
| }); |
| |
| test("- rest after true", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.hasNext, true); |
| var stream = events.rest; |
| expect(await stream.toList(), [2, 3, 4]); |
| }); |
| |
| test("- rest after true, at last", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| expect(await events.hasNext, true); |
| var stream = events.rest; |
| expect(await stream.toList(), [4]); |
| }); |
| |
| test("- rest after false", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| expect(await events.next, 4); |
| expect(await events.hasNext, false); |
| var stream = events.rest; |
| expect(await stream.toList(), isEmpty); |
| }); |
| |
| test("- cancel after true on data", () async { |
| var events = new StreamQueue<int>(createStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.hasNext, true); |
| expect(await events.cancel(), null); |
| }); |
| |
| test("- cancel after true on error", () async { |
| var events = new StreamQueue<int>(createErrorStream()); |
| expect(await events.next, 1); |
| expect(await events.next, 2); |
| expect(await events.hasNext, true); |
| expect(await events.cancel(), null); |
| }); |
| }); |
| |
| group("startTransaction operation produces a transaction that", () { |
| StreamQueue<int> events; |
| StreamQueueTransaction<int> transaction; |
| StreamQueue<int> queue1; |
| StreamQueue<int> queue2; |
| setUp(() async { |
| events = new StreamQueue(createStream()); |
| expect(await events.next, 1); |
| transaction = events.startTransaction(); |
| queue1 = transaction.newQueue(); |
| queue2 = transaction.newQueue(); |
| }); |
| |
| group("emits queues that", () { |
| test("independently emit events", () async { |
| expect(await queue1.next, 2); |
| expect(await queue2.next, 2); |
| expect(await queue2.next, 3); |
| expect(await queue1.next, 3); |
| expect(await queue1.next, 4); |
| expect(await queue2.next, 4); |
| expect(await queue1.hasNext, isFalse); |
| expect(await queue2.hasNext, isFalse); |
| }); |
| |
| test("queue requests for events", () async { |
| expect(queue1.next, completion(2)); |
| expect(queue2.next, completion(2)); |
| expect(queue2.next, completion(3)); |
| expect(queue1.next, completion(3)); |
| expect(queue1.next, completion(4)); |
| expect(queue2.next, completion(4)); |
| expect(queue1.hasNext, completion(isFalse)); |
| expect(queue2.hasNext, completion(isFalse)); |
| }); |
| |
| test("independently emit errors", () async { |
| events = new StreamQueue(createErrorStream()); |
| expect(await events.next, 1); |
| transaction = events.startTransaction(); |
| queue1 = transaction.newQueue(); |
| queue2 = transaction.newQueue(); |
| |
| expect(queue1.next, completion(2)); |
| expect(queue2.next, completion(2)); |
| expect(queue2.next, throwsA("To err is divine!")); |
| expect(queue1.next, throwsA("To err is divine!")); |
| expect(queue1.next, completion(4)); |
| expect(queue2.next, completion(4)); |
| expect(queue1.hasNext, completion(isFalse)); |
| expect(queue2.hasNext, completion(isFalse)); |
| }); |
| }); |
| |
| group("when rejected", () { |
| test("further original requests use the previous state", () async { |
| expect(await queue1.next, 2); |
| expect(await queue2.next, 2); |
| expect(await queue2.next, 3); |
| |
| await flushMicrotasks(); |
| transaction.reject(); |
| |
| expect(await events.next, 2); |
| expect(await events.next, 3); |
| expect(await events.next, 4); |
| expect(await events.hasNext, isFalse); |
| }); |
| |
| test("pending original requests use the previous state", () async { |
| expect(await queue1.next, 2); |
| expect(await queue2.next, 2); |
| expect(await queue2.next, 3); |
| expect(events.next, completion(2)); |
| expect(events.next, completion(3)); |
| expect(events.next, completion(4)); |
| expect(events.hasNext, completion(isFalse)); |
| |
| await flushMicrotasks(); |
| transaction.reject(); |
| }); |
| |
| test("further child requests act as though the stream was closed", |
| () async { |
| expect(await queue1.next, 2); |
| transaction.reject(); |
| |
| expect(await queue1.hasNext, isFalse); |
| expect(queue1.next, throwsStateError); |
| }); |
| |
| test("pending child requests act as though the stream was closed", |
| () async { |
| expect(await queue1.next, 2); |
| expect(queue1.hasNext, completion(isFalse)); |
| expect(queue1.next, throwsStateError); |
| transaction.reject(); |
| }); |
| |
| // Regression test. |
| test("pending child rest requests emit no more events", () async { |
| var controller = new StreamController(); |
| var events = new StreamQueue(controller.stream); |
| var transaction = events.startTransaction(); |
| var queue = transaction.newQueue(); |
| |
| // This should emit no more events after the transaction is rejected. |
| queue.rest.listen(expectAsync1((_) {}, count: 3), |
| onDone: expectAsync0(() {}, count: 0)); |
| |
| controller.add(1); |
| controller.add(2); |
| controller.add(3); |
| await flushMicrotasks(); |
| |
| transaction.reject(); |
| await flushMicrotasks(); |
| |
| // These shouldn't affect the result of `queue.rest.toList()`. |
| controller.add(4); |
| controller.add(5); |
| }); |
| |
| test("child requests' cancel() may still be called explicitly", () async { |
| transaction.reject(); |
| await queue1.cancel(); |
| }); |
| |
| test("calls to commit() or reject() fail", () async { |
| transaction.reject(); |
| expect(transaction.reject, throwsStateError); |
| expect(() => transaction.commit(queue1), throwsStateError); |
| }); |
| |
| test("before the transaction emits any events, does nothing", () async { |
| var controller = new StreamController(); |
| var events = new StreamQueue(controller.stream); |
| |
| // Queue a request before the transaction, but don't let it complete |
| // until we're done with the transaction. |
| expect(events.next, completion(equals(1))); |
| events.startTransaction().reject(); |
| expect(events.next, completion(equals(2))); |
| |
| await flushMicrotasks(); |
| controller.add(1); |
| await flushMicrotasks(); |
| controller.add(2); |
| await flushMicrotasks(); |
| controller.close(); |
| }); |
| }); |
| |
| group("when committed", () { |
| test("further original requests use the committed state", () async { |
| expect(await queue1.next, 2); |
| await flushMicrotasks(); |
| transaction.commit(queue1); |
| expect(await events.next, 3); |
| }); |
| |
| test("pending original requests use the committed state", () async { |
| expect(await queue1.next, 2); |
| expect(events.next, completion(3)); |
| await flushMicrotasks(); |
| transaction.commit(queue1); |
| }); |
| |
| test("further child requests act as though the stream was closed", |
| () async { |
| expect(await queue2.next, 2); |
| transaction.commit(queue2); |
| |
| expect(await queue1.hasNext, isFalse); |
| expect(queue1.next, throwsStateError); |
| }); |
| |
| test("pending child requests act as though the stream was closed", |
| () async { |
| expect(await queue2.next, 2); |
| expect(queue1.hasNext, completion(isFalse)); |
| expect(queue1.next, throwsStateError); |
| transaction.commit(queue2); |
| }); |
| |
| test("further requests act as though the stream was closed", () async { |
| expect(await queue1.next, 2); |
| transaction.commit(queue1); |
| |
| expect(await queue1.hasNext, isFalse); |
| expect(queue1.next, throwsStateError); |
| }); |
| |
| test("cancel() may still be called explicitly", () async { |
| expect(await queue1.next, 2); |
| transaction.commit(queue1); |
| await queue1.cancel(); |
| }); |
| |
| test("throws if there are pending requests", () async { |
| expect(await queue1.next, 2); |
| expect(queue1.hasNext, completion(isTrue)); |
| expect(() => transaction.commit(queue1), throwsStateError); |
| }); |
| |
| test("calls to commit() or reject() fail", () async { |
| transaction.commit(queue1); |
| expect(transaction.reject, throwsStateError); |
| expect(() => transaction.commit(queue1), throwsStateError); |
| }); |
| |
| test("before the transaction emits any events, does nothing", () async { |
| var controller = new StreamController(); |
| var events = new StreamQueue(controller.stream); |
| |
| // Queue a request before the transaction, but don't let it complete |
| // until we're done with the transaction. |
| expect(events.next, completion(equals(1))); |
| var transaction = events.startTransaction(); |
| transaction.commit(transaction.newQueue()); |
| expect(events.next, completion(equals(2))); |
| |
| await flushMicrotasks(); |
| controller.add(1); |
| await flushMicrotasks(); |
| controller.add(2); |
| await flushMicrotasks(); |
| controller.close(); |
| }); |
| }); |
| }); |
| |
| group("withTransaction operation", () { |
| StreamQueue<int> events; |
| setUp(() async { |
| events = new StreamQueue(createStream()); |
| expect(await events.next, 1); |
| }); |
| |
| test("passes a copy of the parent queue", () async { |
| await events.withTransaction(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| expect(await queue.next, 3); |
| expect(await queue.next, 4); |
| expect(await queue.hasNext, isFalse); |
| return true; |
| })); |
| }); |
| |
| test( |
| "the parent queue continues from the child position if it returns " |
| "true", () async { |
| await events.withTransaction(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| return true; |
| })); |
| |
| expect(await events.next, 3); |
| }); |
| |
| test( |
| "the parent queue continues from its original position if it returns " |
| "false", () async { |
| await events.withTransaction(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| return false; |
| })); |
| |
| expect(await events.next, 2); |
| }); |
| |
| test("the parent queue continues from the child position if it throws", () { |
| expect(events.withTransaction(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| throw "oh no"; |
| })), throwsA("oh no")); |
| |
| expect(events.next, completion(3)); |
| }); |
| |
| test("returns whether the transaction succeeded", () { |
| expect(events.withTransaction((_) async => true), completion(isTrue)); |
| expect(events.withTransaction((_) async => false), completion(isFalse)); |
| }); |
| }); |
| |
| group("cancelable operation", () { |
| StreamQueue<int> events; |
| setUp(() async { |
| events = new StreamQueue(createStream()); |
| expect(await events.next, 1); |
| }); |
| |
| test("passes a copy of the parent queue", () async { |
| await events.cancelable(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| expect(await queue.next, 3); |
| expect(await queue.next, 4); |
| expect(await queue.hasNext, isFalse); |
| })).value; |
| }); |
| |
| test("the parent queue continues from the child position by default", |
| () async { |
| await events.cancelable(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| })).value; |
| |
| expect(await events.next, 3); |
| }); |
| |
| test( |
| "the parent queue continues from the child position if an error is " |
| "thrown", () async { |
| expect( |
| events.cancelable(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| throw "oh no"; |
| })).value, |
| throwsA("oh no")); |
| |
| expect(events.next, completion(3)); |
| }); |
| |
| test("the parent queue continues from the original position if canceled", |
| () async { |
| var operation = events.cancelable(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| })); |
| operation.cancel(); |
| |
| expect(await events.next, 2); |
| }); |
| |
| test("forwards the value from the callback", () async { |
| expect( |
| await events.cancelable(expectAsync1((queue) async { |
| expect(await queue.next, 2); |
| return "value"; |
| })).value, |
| "value"); |
| }); |
| }); |
| |
| test("all combinations sequential skip/next/take operations", () async { |
| // Takes all combinations of two of next, skip and take, then ends with |
| // doing rest. Each of the first rounds do 10 events of each type, |
| // the rest does 20 elements. |
| var eventCount = 20 * (3 * 3 + 1); |
| var events = new StreamQueue<int>(createLongStream(eventCount)); |
| |
| // Test expecting [startIndex .. startIndex + 9] as events using |
| // `next`. |
| nextTest(startIndex) { |
| for (int i = 0; i < 10; i++) { |
| expect(events.next, completion(startIndex + i)); |
| } |
| } |
| |
| // Test expecting 10 events to be skipped. |
| skipTest(startIndex) { |
| expect(events.skip(10), completion(0)); |
| } |
| |
| // Test expecting [startIndex .. startIndex + 9] as events using |
| // `take(10)`. |
| takeTest(startIndex) { |
| expect(events.take(10), |
| completion(new List.generate(10, (i) => startIndex + i))); |
| } |
| |
| var tests = [nextTest, skipTest, takeTest]; |
| |
| int counter = 0; |
| // Run through all pairs of two tests and run them. |
| for (int i = 0; i < tests.length; i++) { |
| for (int j = 0; j < tests.length; j++) { |
| tests[i](counter); |
| tests[j](counter + 10); |
| counter += 20; |
| } |
| } |
| // Then expect 20 more events as a `rest` call. |
| expect(events.rest.toList(), |
| completion(new List.generate(20, (i) => counter + i))); |
| }); |
| } |
| |
| typedef T Func1Required<T>(T value); |
| |
| Stream<int> createStream() async* { |
| yield 1; |
| await flushMicrotasks(); |
| yield 2; |
| await flushMicrotasks(); |
| yield 3; |
| await flushMicrotasks(); |
| yield 4; |
| } |
| |
| Stream<int> createErrorStream() { |
| var controller = new StreamController<int>(); |
| () async { |
| controller.add(1); |
| await flushMicrotasks(); |
| controller.add(2); |
| await flushMicrotasks(); |
| controller.addError("To err is divine!"); |
| await flushMicrotasks(); |
| controller.add(4); |
| await flushMicrotasks(); |
| controller.close(); |
| }(); |
| return controller.stream; |
| } |
| |
| Stream<int> createLongStream(int eventCount) async* { |
| for (int i = 0; i < eventCount; i++) yield i; |
| } |