blob: 346c21b0ddbc68c0860619700a03139ccb1f4019 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:test/test.dart';
import 'package:pedantic/pedantic.dart';
import 'package:pub/src/rate_limited_scheduler.dart';
void main() {
Map<String, Completer> threeCompleters() =>
{'a': Completer(), 'b': Completer(), 'c': Completer()};
test('scheduler is rate limited', () async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return i.toUpperCase();
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
preschedule('b');
preschedule('c');
await Future.wait(
[isBeingProcessed['a'].future, isBeingProcessed['b'].future]);
expect(isBeingProcessed['c'].isCompleted, isFalse);
completers['a'].complete();
await isBeingProcessed['c'].future;
completers['c'].complete();
expect(await scheduler.schedule('c'), 'C');
});
});
test('scheduler.preschedule cancels unrun prescheduled task after callback',
() async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return i.toUpperCase();
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
await scheduler.withPrescheduling((preschedule1) async {
await scheduler.withPrescheduling((preschedule2) async {
preschedule1('a');
preschedule2('b');
preschedule1('c');
await isBeingProcessed['a'].future;
// b, c should not start processing due to rate-limiting.
expect(isBeingProcessed['b'].isCompleted, isFalse);
expect(isBeingProcessed['c'].isCompleted, isFalse);
});
completers['a'].complete();
// b is removed from the queue, now c should start processing.
await isBeingProcessed['c'].future;
completers['c'].complete();
expect(await scheduler.schedule('c'), 'C');
// b is not on the queue anymore.
expect(isBeingProcessed['b'].isCompleted, isFalse);
});
});
test('scheduler.preschedule does not cancel tasks that are scheduled',
() async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return i.toUpperCase();
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
Future b;
await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
preschedule('b');
await isBeingProcessed['a'].future;
// b should not start processing due to rate-limiting.
expect(isBeingProcessed['b'].isCompleted, isFalse);
b = scheduler.schedule('b');
});
completers['a'].complete();
expect(await scheduler.schedule('a'), 'A');
// b was scheduled, so it should get processed now
await isBeingProcessed['b'].future;
completers['b'].complete();
expect(await b, 'B');
});
test('scheduler caches results', () async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return i.toUpperCase();
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
completers['a'].complete();
expect(await scheduler.schedule('a'), 'A');
// Would fail if isBeingProcessed['a'] was completed twice
expect(await scheduler.schedule('a'), 'A');
});
test('scheduler prioritizes fetched tasks before prefetched', () async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return i.toUpperCase();
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
preschedule('b');
await isBeingProcessed['a'].future;
final cResult = scheduler.schedule('c');
expect(isBeingProcessed['b'].isCompleted, isFalse);
completers['a'].complete();
completers['c'].complete();
await isBeingProcessed['c'].future;
// 'c' is done before we allow 'b' to finish processing
expect(await cResult, 'C');
});
});
test('Errors trigger when the scheduled future is listened to', () async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return i.toUpperCase();
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
await scheduler.withPrescheduling((preschedule) async {
preschedule('a');
preschedule('b');
preschedule('c');
await isBeingProcessed['a'].future;
await isBeingProcessed['b'].future;
expect(isBeingProcessed['c'].isCompleted, isFalse);
unawaited(completers['c'].future.catchError((_) {}));
completers['c'].completeError('errorC');
completers['a'].completeError('errorA');
await isBeingProcessed['c'].future;
completers['b'].completeError('errorB');
expect(() async => await scheduler.schedule('a'), throwsA('errorA'));
expect(() async => await scheduler.schedule('b'), throwsA('errorB'));
expect(() async => await scheduler.schedule('c'), throwsA('errorC'));
});
});
test('tasks run in the zone they where enqueued in', () async {
final completers = threeCompleters();
final isBeingProcessed = threeCompleters();
Future<String> f(String i) async {
isBeingProcessed[i].complete();
await completers[i].future;
return Zone.current['zoneValue'];
}
final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
await scheduler.withPrescheduling((preschedule) async {
runZoned(() {
preschedule('a');
}, zoneValues: {'zoneValue': 'A'});
runZoned(() {
preschedule('b');
}, zoneValues: {'zoneValue': 'B'});
runZoned(() {
preschedule('c');
}, zoneValues: {'zoneValue': 'C'});
await runZoned(() async {
await isBeingProcessed['a'].future;
await isBeingProcessed['b'].future;
// This will put 'c' in front of the queue, but in a zone with zoneValue
// bound to S.
final f = expectLater(scheduler.schedule('c'), completion('S'));
completers['a'].complete();
completers['b'].complete();
expect(await scheduler.schedule('a'), 'A');
expect(await scheduler.schedule('b'), 'B');
completers['c'].complete();
await f;
}, zoneValues: {'zoneValue': 'S'});
});
});
}