blob: 413ed1d66269c47eaa3ba5f77947b46a4a0d7a41 [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
extension StreamRepeatLatestExtension<T> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners])
listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
if (latest != null) controller.add(latest);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}
void main() {
asyncStart();
testStreamsIndependent();
asyncTest(testStreamNonOverlap);
asyncTest(testRepeatLatest);
asyncEnd();
}
/// Test that the streams can provide different events.
void testStreamsIndependent() {
var log = <String>[];
var index = 0;
var multi = Stream<List<int>>.multi((c) {
var id = ++index;
log.add("$id");
for (var i = 0; i < id + 1; i++) {
c.add([id, i]);
}
c.close();
});
void logList(List<int> l) {
log.add("${l.first}-${l.last}");
}
asyncStart();
Future.wait([multi.forEach(logList), multi.forEach(logList)])
.whenComplete(() {
Expect.equals(7, log.length);
for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) {
Expect.isTrue(log.contains(element));
}
asyncEnd();
});
}
/// Test that stream can be listened to again after having no listener.
Future<void> testStreamNonOverlap() async {
var completer = Completer<Object>();
MultiStreamController<int> controller;
var stream = Stream<int>.multi((c) {
controller = c;
c.onCancel = () {
controller = null;
if (!completer.isCompleted) completer.complete(null);
};
});
for (var i in [1, 2, 3]) {
var log = <Object>[];
var subscription = stream.listen((v) {
log.add(v);
if (!completer.isCompleted) completer.complete(v);
}, onError: (e, s) {
log.add(e);
if (!completer.isCompleted) completer.complete(e);
}, onDone: () {
log.add(null);
if (!completer.isCompleted) completer.complete(null);
});
Expect.isNotNull(controller);
controller.add(1);
await completer.future;
Expect.listEquals([1], log);
completer = Completer();
controller.add(2);
await completer.future;
Expect.listEquals([1, 2], log);
completer = Completer();
if (i == 2) {
subscription.cancel();
} else {
controller.close();
}
await completer.future;
Expect.listEquals([1, 2, if (i != 2) null], log);
}
}
/// Test that the [Stream.repeatLatest] example code works as described.
Future<void> testRepeatLatest() async {
var c = StreamController<int>();
var repStream = c.stream.repeatLatest();
var f1 = repStream.first;
c.add(1);
var v1 = await f1;
Expect.equals(1, v1);
var f2 = repStream.take(2).toList();
c.add(2);
var l2 = await f2;
Expect.listEquals([1, 2], l2);
var f3 = repStream.take(2).toList();
c.add(3);
var l3 = await f3;
Expect.listEquals([2, 3], l3);
}