| // Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:collection'; |
| |
| import 'package:pool/pool.dart'; |
| |
| /// Handles rate-limited scheduling of tasks. |
| /// |
| /// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key) |
| /// and run with a supplied async function. |
| /// |
| /// Designed to allow speculatively running tasks that will likely be needed |
| /// later with [withPrescheduling]. |
| /// |
| /// Errors thrown by tasks scheduled with the `preschedule` callback will only |
| /// be triggered when you await the [Future] returned by [schedule]. |
| /// |
| /// The operation will run in the [Zone] that the task was in when enqueued. |
| /// |
| /// If a task if [preschedule]d and later [schedule]d before the operation is |
| /// started, the task will go in front of the queue with the zone of the |
| /// [schedule] operation. |
| /// |
| /// Example: |
| /// |
| /// ```dart |
| /// // A scheduler that, given a uri, gets that page and returns the body |
| /// final scheduler = RateLimitedScheduler(http.read); |
| /// |
| /// scheduler.withPresceduling((preschedule) { |
| /// // Start fetching `pub.dev` and `dart.dev` in the background. |
| /// scheduler.preschedule(Uri.parse('https://pub.dev/')); |
| /// scheduler.preschedule(Uri.parse('https://dart.dev/')); |
| /// // ... do time-consuming task. |
| /// // Now we actually need `pub.dev`. |
| /// final pubDevBody = |
| /// await scheduler.schedule(Uri.parse('https://pub.dev/')); |
| /// // if the `dart.dev` task has not started yet, it will be canceled when |
| /// // leaving `withPresceduling`. |
| /// }); |
| /// ``` |
| class RateLimitedScheduler<J, V> { |
| final Future<V> Function(J) _runJob; |
| |
| /// The results of ongoing and finished jobs. |
| final Map<J, Completer<V>> _cache = <J, Completer<V>>{}; |
| |
| /// Provides sync access to completed results. |
| final Map<J, V> _results = <J, V>{}; |
| |
| /// Tasks that are waiting to be run. |
| final Queue<_Task<J>> _queue = Queue<_Task<J>>(); |
| |
| /// Rate limits the number of concurrent jobs. |
| final Pool _pool; |
| |
| /// Jobs that have started running. |
| final Set<J> _started = {}; |
| |
| RateLimitedScheduler(Future<V> Function(J) runJob, |
| {required int maxConcurrentOperations}) |
| : _runJob = runJob, |
| _pool = Pool(maxConcurrentOperations); |
| |
| /// Pick the next task in [_queue] and run it. |
| /// |
| /// If the task is already in [_started] it will not be run again. |
| Future<void> _processNextTask() async { |
| if (_queue.isEmpty) { |
| return; |
| } |
| final task = _queue.removeFirst(); |
| final completer = _cache[task.jobId]!; |
| |
| if (!_started.add(task.jobId)) { |
| return; |
| } |
| |
| // Use an async function to catch sync exceptions from _runJob. |
| Future<V> runJob() async { |
| return _results[task.jobId] = |
| await task.zone.runUnary(_runJob, task.jobId); |
| } |
| |
| completer.complete(runJob()); |
| // Listen to errors on the completer: |
| // this will make errors thrown by [_run] not |
| // become uncaught. |
| // |
| // They will still show up for other listeners of the future. |
| try { |
| await completer.future; |
| } catch (_) {} |
| } |
| |
| /// Calls [callback] with a function that can pre-schedule jobs. |
| /// |
| /// When [callback] returns, all jobs that where prescheduled by [callback] |
| /// that have not started running will be removed from the work queue |
| /// (if they have been added separately by [schedule] they will still be |
| /// executed). |
| Future<R> withPrescheduling<R>( |
| FutureOr<R> Function(void Function(J) preschedule) callback, |
| ) async { |
| final prescheduled = <_Task>{}; |
| try { |
| return await callback((jobId) { |
| if (_started.contains(jobId)) return; |
| final task = _Task(jobId, Zone.current); |
| _cache.putIfAbsent(jobId, () => Completer()); |
| _queue.addLast(task); |
| prescheduled.add(task); |
| |
| unawaited(_pool.withResource(_processNextTask)); |
| }); |
| } finally { |
| _queue.removeWhere(prescheduled.contains); |
| } |
| } |
| |
| /// Returns a future that completed with the result of running [jobId]. |
| /// |
| /// If [jobId] has already run, the cached result will be returned. |
| /// If [jobId] is not yet running, it will go to the front of the work queue |
| /// to be scheduled next when there are free resources. |
| Future<V> schedule(J jobId) { |
| final completer = _cache.putIfAbsent(jobId, () => Completer()); |
| if (!_started.contains(jobId)) { |
| final task = _Task(jobId, Zone.current); |
| _queue.addFirst(task); |
| scheduleMicrotask(() => _pool.withResource(_processNextTask)); |
| } |
| return completer.future; |
| } |
| |
| /// Returns the result of running [jobId] if that is already done. |
| /// Otherwise returns `null`. |
| V? peek(J jobId) => _results[jobId]; |
| } |
| |
| class _Task<J> { |
| final J jobId; |
| final Zone zone; |
| _Task(this.jobId, this.zone); |
| |
| @override |
| String toString() => jobId.toString(); |
| } |