blob: ed590f3a9f9617a587e52cb58998c1f0a9ad728b [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library barback.test.stream_pool_test;
import 'dart:async';
import 'package:barback/src/utils.dart';
import 'package:barback/src/utils/stream_pool.dart';
import 'package:scheduled_test/scheduled_test.dart';
import 'utils.dart';
main() {
initConfig();
group("buffered", () {
test("buffers events from multiple inputs", () {
var pool = new StreamPool<String>();
var controller1 = new StreamController<String>();
pool.add(controller1.stream);
controller1.add("first");
var controller2 = new StreamController<String>();
pool.add(controller2.stream);
controller2.add("second");
// Call [toList] asynchronously to be sure that the events have been
// buffered beforehand and aren't just being received unbuffered.
expect(newFuture(() => pool.stream.toList()),
completion(equals(["first", "second"])));
pumpEventQueue().then((_) => pool.close());
});
test("buffers errors from multiple inputs", () {
var pool = new StreamPool<String>();
var controller1 = new StreamController<String>();
pool.add(controller1.stream);
controller1.add("first");
var controller2 = new StreamController<String>();
pool.add(controller2.stream);
controller2.add("second");
controller1.addError("third");
controller2.addError("fourth");
controller1.add("fifth");
expect(newFuture(() {
return pool.stream.transform(new StreamTransformer.fromHandlers(
handleData: (data, sink) => sink.add(["data", data]),
handleError: (error, stackTrace, sink) {
sink.add(["error", error]);
})).toList();
}), completion(equals([
["data", "first"],
["data", "second"],
["error", "third"],
["error", "fourth"],
["data", "fifth"]
])));
pumpEventQueue().then((_) => pool.close());
});
test("buffers inputs from a broadcast stream", () {
var pool = new StreamPool<String>();
var controller = new StreamController<String>.broadcast();
pool.add(controller.stream);
controller.add("first");
controller.add("second");
// Call [toList] asynchronously to be sure that the events have been
// buffered beforehand and aren't just being received unbuffered.
expect(newFuture(() => pool.stream.toList()),
completion(equals(["first", "second"])));
pumpEventQueue().then((_) => pool.close());
});
});
group("broadcast", () {
test("doesn't buffer inputs", () {
var pool = new StreamPool<String>.broadcast();
var controller1 = new StreamController<String>.broadcast();
pool.add(controller1.stream);
controller1.add("first");
var controller2 = new StreamController<String>.broadcast();
pool.add(controller2.stream);
controller2.add("second");
// Call [toList] asynchronously to be sure that the events have been
// buffered beforehand and aren't just being received unbuffered.
expect(newFuture(() => pool.stream.toList()), completion(isEmpty));
pumpEventQueue().then((_) => pool.close());
});
test("doesn't buffer errors", () {
var pool = new StreamPool<String>.broadcast();
var controller1 = new StreamController<String>.broadcast();
pool.add(controller1.stream);
controller1.addError("first");
var controller2 = new StreamController<String>.broadcast();
pool.add(controller2.stream);
controller2.addError("second");
expect(newFuture(() {
return pool.stream.transform(new StreamTransformer.fromHandlers(
handleData: (data, sink) => sink.add(data),
handleError: (error, stackTrace, sink) { sink.add(error); }))
.toList();
}), completion(isEmpty));
pumpEventQueue().then((_) => pool.close());
});
test("doesn't buffer inputs from a buffered stream", () {
var pool = new StreamPool<String>.broadcast();
var controller = new StreamController<String>();
pool.add(controller.stream);
controller.add("first");
controller.add("second");
expect(pumpEventQueue().then((_) => pool.stream.toList()),
completion(isEmpty));
pumpEventQueue().then((_) => pool.close());
});
});
for (var type in ["buffered", "broadcast"]) {
group(type, () {
var pool;
var bufferedController;
var bufferedStream;
var bufferedSyncController;
var broadcastController;
var broadcastStream;
var broadcastSyncController;
setUp(() {
if (type == "buffered") {
pool = new StreamPool<String>();
} else {
pool = new StreamPool<String>.broadcast();
}
bufferedController = new StreamController<String>();
pool.add(bufferedController.stream);
bufferedSyncController = new StreamController<String>(sync: true);
pool.add(bufferedSyncController.stream);
broadcastController = new StreamController<String>.broadcast();
pool.add(broadcastController.stream);
broadcastSyncController =
new StreamController<String>.broadcast(sync: true);
pool.add(broadcastSyncController.stream);
});
test("emits events to a listener", () {
expect(pool.stream.toList(), completion(equals(["first", "second"])));
bufferedController.add("first");
broadcastController.add("second");
pumpEventQueue().then((_) => pool.close());
});
test("emits sync events synchronously", () {
var events = [];
pool.stream.listen(events.add);
bufferedSyncController.add("first");
expect(events, equals(["first"]));
broadcastSyncController.add("second");
expect(events, equals(["first", "second"]));
});
test("emits async events asynchronously", () {
var events = [];
pool.stream.listen(events.add);
bufferedController.add("first");
broadcastController.add("second");
expect(events, isEmpty);
expect(pumpEventQueue().then((_) => events),
completion(equals(["first", "second"])));
});
test("doesn't emit events from removed streams", () {
expect(pool.stream.toList(), completion(equals(["first", "third"])));
bufferedController.add("first");
expect(pumpEventQueue().then((_) {
pool.remove(bufferedController.stream);
bufferedController.add("second");
}).then((_) {
broadcastController.add("third");
return pumpEventQueue();
}).then((_) {
pool.remove(broadcastController.stream);
broadcastController.add("fourth");
pool.close();
}), completes);
});
});
}
}