blob: fd2f3fd4a5941afd4b5da4dc6e9523e921e0af60 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library services.scheduler;
import 'dart:async';
import 'dart:collection';
class TaskScheduler {
final Queue<_Task<dynamic>> _taskQueue = Queue<_Task<dynamic>>();
bool _isActive = false;
int get queueCount => _taskQueue.length;
Future<T> _performTask<T>(Task<T> task) {
if (task.timeoutDuration != null) {
return task.perform().timeout(task.timeoutDuration);
} else {
return task.perform();
}
}
Future<T> schedule<T>(Task<T> task) {
if (!_isActive) {
_isActive = true;
return _performTask(task).whenComplete(_next);
}
final taskResult = Completer<T>();
_taskQueue.add(_Task<T>(task, taskResult));
return taskResult.future;
}
void _next() {
assert(_isActive);
if (_taskQueue.isEmpty) {
_isActive = false;
return;
}
final first = _taskQueue.removeFirst();
first.taskResult.complete(_performTask(first.task).whenComplete(_next));
}
}
// Internal unit of scheduling.
class _Task<T> {
final Task<T> task;
final Completer<T> taskResult;
_Task(this.task, this.taskResult);
}
// Public working data structure.
abstract class Task<T> {
Future<T> perform();
Duration timeoutDuration;
}
class ClosureTask<T> extends Task<T> {
final Future<T> Function() _closure;
ClosureTask(this._closure, {Duration timeoutDuration}) {
this.timeoutDuration = timeoutDuration;
}
@override
Future<T> perform() {
return _closure();
}
}