blob: b2beb44ede123131ab2d31ec38bc68edd983178c [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T? latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners])
listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
var latestValue = latest;
if (latestValue != null) controller.add(latestValue);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}
void main() {
asyncStart();
testStreamsIndependent();
asyncTest(testStreamNonOverlap);
asyncTest(testRepeatLatest);
asyncTest(testIncorrectUse);
asyncEnd();
}
/// Test that the streams can provide different events.
void testStreamsIndependent() {
var log = <String>[];
var index = 0;
var multi = Stream<List<int>>.multi((c) {
var id = ++index;
log.add("$id");
for (var i = 0; i < id + 1; i++) {
c.add([id, i]);
}
c.close();
});
void logList(List<int> l) {
log.add("${l.first}-${l.last}");
}
asyncStart();
Future.wait([multi.forEach(logList), multi.forEach(logList)])
.whenComplete(() {
Expect.equals(7, log.length);
for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) {
Expect.isTrue(log.contains(element));
}
asyncEnd();
});
}
/// Test that stream can be listened to again after having no listener.
Future<void> testStreamNonOverlap() async {
var completer = Completer<Object?>();
MultiStreamController<int>? controller;
var stream = Stream<int>.multi((c) {
controller = c;
c.onCancel = () {
controller = null;
if (!completer.isCompleted) completer.complete(null);
};
});
for (var i in [1, 2, 3]) {
var log = <Object?>[];
var subscription = stream.listen((v) {
log.add(v);
if (!completer.isCompleted) completer.complete(v);
}, onError: (e, s) {
log.add(e);
if (!completer.isCompleted) completer.complete(e);
}, onDone: () {
log.add(null);
if (!completer.isCompleted) completer.complete(null);
});
Expect.isNotNull(controller);
controller!.add(1);
await completer.future;
Expect.listEquals([1], log);
completer = Completer();
controller!.add(2);
await completer.future;
Expect.listEquals([1, 2], log);
completer = Completer();
if (i == 2) {
subscription.cancel();
} else {
controller!.close();
}
await completer.future;
Expect.listEquals([1, 2, if (i != 2) null], log);
}
}
/// Test that the [Stream.repeatLatest] example code works as described.
Future<void> testRepeatLatest() async {
var c = StreamController<int>();
var repStream = c.stream.repeatLatest();
var f1 = repStream.first;
c.add(1);
var v1 = await f1;
Expect.equals(1, v1);
var f2 = repStream.take(2).toList();
c.add(2);
var l2 = await f2;
Expect.listEquals([1, 2], l2);
var f3 = repStream.take(2).toList();
c.add(3);
var l3 = await f3;
Expect.listEquals([2, 3], l3);
}
// Test that errors are thrown when required,
// and use after cancel is ignored.
Future<void> testIncorrectUse() async {
{
var lock = Completer();
var lock2 = Completer();
var stream = Stream.multi((c) async {
c.add(2);
await lock.future;
Expect.isTrue(!c.hasListener);
c.add(2);
c.addError("Error");
c.close();
// No adding after close.
Expect.throws<StateError>(() => c.add(3));
Expect.throws<StateError>(() => c.addSync(3));
Expect.throws<StateError>(() => c.addError("E"));
Expect.throws<StateError>(() => c.addErrorSync("E"));
Expect.throws<StateError>(() => c.addStream(Stream.empty()));
lock2.complete();
});
await for (var v in stream) {
Expect.equals(2, v);
break; // Cancels subscription.
}
lock.complete();
await lock2.future;
}
{
var lock = Completer();
var lock2 = Completer();
var stream = Stream.multi((c) async {
c.add(2);
await lock.future;
Expect.isTrue(!c.hasListener);
c.addSync(2);
c.addErrorSync("Error");
c.closeSync();
// No adding after close.
Expect.throws<StateError>(() => c.add(3));
Expect.throws<StateError>(() => c.addSync(3));
Expect.throws<StateError>(() => c.addError("E"));
Expect.throws<StateError>(() => c.addErrorSync("E"));
Expect.throws<StateError>(() => c.addStream(Stream.empty()));
lock2.complete();
});
await for (var v in stream) {
Expect.equals(2, v);
break; // Cancels subscription.
}
lock.complete();
await lock2.future;
}
{
var stream = Stream.multi((c) async {
var c2 = StreamController();
c.addStream(c2.stream);
// Now adding stream, cannot add events at the same time (for now!).
Expect.throws<StateError>(() => c.add(1));
Expect.throws<StateError>(() => c.addSync(1));
Expect.throws<StateError>(() => c.addError("Error"));
Expect.throws<StateError>(() => c.addErrorSync("Error"));
Expect.throws<StateError>(() => c.close());
Expect.throws<StateError>(() => c.closeSync());
await c2.close();
c.add(42);
c.close();
});
await for (var v in stream) {
Expect.equals(42, v);
}
}
}