blob: e632d4ef3a2379d5c9067e499222cda4a200ff19 [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'package:async_helper/async_minitest.dart';
main() {
const ms5 = const Duration(milliseconds: 5);
const twoSecs = const Duration(seconds: 2);
test("stream timeout", () {
StreamController c = new StreamController();
Stream tos = c.stream.timeout(ms5);
expect(tos.isBroadcast, false);
tos.handleError(expectAsync((e, s) {
expect(e, new isInstanceOf<TimeoutException>());
expect(s, StackTrace.empty);
})).listen((v) {
fail("Unexpected event");
});
});
test("stream timeout add events", () {
StreamController c = new StreamController();
Stream tos = c.stream.timeout(ms5, onTimeout: (sink) {
sink.add(42);
sink.addError("ERROR");
sink.close();
});
expect(tos.isBroadcast, false);
tos.listen(expectAsync((v) {
expect(v, 42);
}), onError: expectAsync((e, s) {
expect(e, "ERROR");
}), onDone: expectAsync(() {}));
});
test("stream no timeout", () {
StreamController c = new StreamController();
Stream tos = c.stream.timeout(twoSecs);
int ctr = 0;
tos.listen((v) {
expect(v, 42);
ctr++;
}, onError: (e, s) {
fail("No error expected");
}, onDone: expectAsync(() {
expect(ctr, 2);
}));
expect(tos.isBroadcast, false);
c
..add(42)
..add(42)
..close(); // Faster than a timeout!
});
test("stream timeout after events", () {
StreamController c = new StreamController();
Stream tos = c.stream.timeout(twoSecs);
expect(tos.isBroadcast, false);
int ctr = 0;
tos.listen((v) {
expect(v, 42);
ctr++;
}, onError: expectAsync((e, s) {
expect(ctr, 2);
expect(e, new isInstanceOf<TimeoutException>());
}));
c..add(42)..add(42); // No close, timeout after two events.
});
test("broadcast stream timeout", () {
StreamController c = new StreamController.broadcast();
Stream tos = c.stream.timeout(ms5);
expect(tos.isBroadcast, true);
tos.handleError(expectAsync((e, s) {
expect(e, new isInstanceOf<TimeoutException>());
expect(s, StackTrace.empty);
})).listen((v) {
fail("Unexpected event");
});
});
test("asBroadcast stream timeout", () {
StreamController c = new StreamController.broadcast();
Stream tos = c.stream.asBroadcastStream().timeout(ms5);
expect(tos.isBroadcast, true);
tos.handleError(expectAsync((e, s) {
expect(e, new isInstanceOf<TimeoutException>());
expect(s, StackTrace.empty);
})).listen((v) {
fail("Unexpected event");
});
});
test("mapped stream timeout", () {
StreamController c = new StreamController();
Stream tos = c.stream.map((x) => 2 * x).timeout(ms5);
expect(tos.isBroadcast, false);
tos.handleError(expectAsync((e, s) {
expect(e, new isInstanceOf<TimeoutException>());
expect(s, StackTrace.empty);
})).listen((v) {
fail("Unexpected event");
});
});
test("events prevent timeout", () {
Stopwatch sw = new Stopwatch();
StreamController c = new StreamController();
Stream tos = c.stream.timeout(twoSecs, onTimeout: (_) {
int elapsed = sw.elapsedMilliseconds;
if (elapsed > 250) {
// This should not happen, but it does occasionally.
// Starving the periodic timer has made the test useless.
print("Periodic timer of 5 ms delayed $elapsed ms.");
return;
}
fail("Timeout not prevented by events");
throw "ERROR";
});
// Start the periodic timer before we start listening to the stream.
// This should reduce the flakiness of the test.
int ctr = 200; // send this many events at 5ms intervals. Then close.
new Timer.periodic(ms5, (timer) {
sw.reset();
c.add(42);
if (--ctr == 0) {
timer.cancel();
c.close();
}
});
sw.start();
tos.listen((v) {
expect(v, 42);
}, onDone: expectAsync(() {}));
});
test("errors prevent timeout", () {
Stopwatch sw = new Stopwatch();
StreamController c = new StreamController();
Stream tos = c.stream.timeout(twoSecs, onTimeout: (_) {
int elapsed = sw.elapsedMilliseconds;
if (elapsed > 250) {
// This should not happen, but it does occasionally.
// Starving the periodic timer has made the test useless.
print("Periodic timer of 5 ms delayed $elapsed ms.");
return;
}
fail("Timeout not prevented by errors");
});
// Start the periodic timer before we start listening to the stream.
// This should reduce the flakiness of the test.
int ctr = 200; // send this many error events at 5ms intervals. Then close.
new Timer.periodic(ms5, (timer) {
sw.reset();
c.addError("ERROR");
if (--ctr == 0) {
timer.cancel();
c.close();
}
});
sw.start();
tos.listen((_) {}, onError: (e, s) {
expect(e, "ERROR");
}, onDone: expectAsync(() {}));
});
test("closing prevents timeout", () {
StreamController c = new StreamController();
Stream tos = c.stream.timeout(twoSecs, onTimeout: (_) {
fail("Timeout not prevented by close");
});
tos.listen((_) {}, onDone: expectAsync(() {}));
c.close();
});
test("pausing prevents timeout", () {
StreamController c = new StreamController();
Stream tos = c.stream.timeout(ms5, onTimeout: (_) {
fail("Timeout not prevented by close");
});
var subscription = tos.listen((_) {}, onDone: expectAsync(() {}));
subscription.pause();
new Timer(twoSecs, () {
c.close();
subscription.resume();
});
});
}