blob: 70f7e7fe354c8c7ce03b67114111363863163c7e [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library barback.graph.phase;
import 'dart:async';
import '../asset/asset_id.dart';
import '../asset/asset_node.dart';
import '../asset/asset_node_set.dart';
import '../errors.dart';
import '../log.dart';
import '../transformer/transformer.dart';
import '../transformer/transformer_group.dart';
import '../utils.dart';
import '../utils/multiset.dart';
import 'asset_cascade.dart';
import 'group_runner.dart';
import 'node_status.dart';
import 'node_streams.dart';
import 'phase_forwarder.dart';
import 'phase_output.dart';
import 'transformer_classifier.dart';
/// One phase in the ordered series of transformations in an [AssetCascade].
/// Each phase can access outputs from previous phases and can in turn pass
/// outputs to later phases. Phases are processed strictly serially. All
/// transforms in a phase will be complete before moving on to the next phase.
/// Within a single phase, all transforms will be run in parallel.
/// Building can be interrupted between phases. For example, a source is added
/// which starts the background process. Sometime during, say, phase 2 (which
/// is running asynchronously) that source is modified. When the process queue
/// goes to advance to phase 3, it will see that modification and start the
/// waterfall from the beginning again.
class Phase {
/// The cascade that owns this phase.
final AssetCascade cascade;
/// A string describing the location of [this] in the transformer graph.
final String _location;
/// The index of [this] in its parent cascade or group.
final int _index;
/// The groups for this phase.
final _groups = new Map<TransformerGroup, GroupRunner>();
/// The inputs for this phase.
/// For the first phase, these will be the source assets. For all other
/// phases, they will be the outputs from the previous phase.
final _inputs = new AssetNodeSet();
/// The transformer classifiers for this phase.
final _classifiers = new Map<Transformer, TransformerClassifier>();
/// The forwarders for this phase.
final _forwarders = new Map<AssetId, PhaseForwarder>();
/// The outputs for this phase.
final _outputs = new Map<AssetId, PhaseOutput>();
/// The set of all [AssetNode.origin] properties of the input assets for this
/// phase.
/// This is used to determine which assets have been passed unmodified through
/// [_classifiers] or [_groups]. It's possible that a given asset was consumed
/// by a group and not an individual transformer, and so shouldn't be
/// forwarded through the phase as a whole.
/// In order to detect whether an output has been forwarded through a group or
/// a classifier, we must be able to distinguish it from other outputs with
/// the same id. To do so, we check if its origin is in [_inputOrigins]. If
/// so, it's been forwarded unmodified.
final _inputOrigins = new Multiset<AssetNode>();
/// The streams exposed by this phase.
final _streams = new NodeStreams();
Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
Stream<AssetNode> get onAsset => _streams.onAsset;
Stream<LogEntry> get onLog => _streams.onLog;
/// How far along [this] is in processing its assets.
NodeStatus get status {
// Before any transformers are added, the phase should be dirty if and only
// if any input is dirty.
if (_classifiers.isEmpty && _groups.isEmpty) {
return _inputs.any((input) => input.state.isDirty) ?
NodeStatus.RUNNING : NodeStatus.IDLE;
var classifierStatus = NodeStatus.dirtiest( => classifier.status));
var groupStatus = NodeStatus.dirtiest( => group.status));
return (previous == null ? NodeStatus.IDLE : previous.status)
/// The previous phase in the cascade, or null if this is the first phase.
final Phase previous;
/// The subscription to [previous]'s [onStatusChange] stream.
StreamSubscription _previousStatusSubscription;
/// The subscription to [previous]'s [onAsset] stream.
StreamSubscription<AssetNode> _previousOnAssetSubscription;
/// A map of asset ids to completers for [getInput] requests.
/// If an asset node is requested before it's available, we put a completer in
/// this map to wait for the asset to be generated. If it's not generated, the
/// completer should complete to `null`.
final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>();
/// Returns all currently-available output assets for this phase.
Set<AssetNode> get availableOutputs {
return _outputs.values
.map((output) => output.output)
.where((node) => node.state.isAvailable)
// TODO(nweiz): Rather than passing the cascade and the phase everywhere,
// create an interface that just exposes [getInput]. Emit errors via
// [AssetNode]s.
Phase(AssetCascade cascade, String location)
: this._(cascade, location, 0);
Phase._(this.cascade, this._location, this._index, [this.previous]) {
if (previous != null) {
_previousOnAssetSubscription = previous.onAsset.listen(addInput);
_previousStatusSubscription = previous.onStatusChange
.listen((_) => _streams.changeStatus(status));
onStatusChange.listen((status) {
if (status == NodeStatus.RUNNING) return;
// All the previous phases have finished declaring or producing their
// outputs. If anyone's still waiting for outputs, cut off the wait; we
// won't be generating them, at least until a source asset changes.
for (var completer in _pendingOutputRequests.values) {
/// Adds a new asset as an input for this phase.
/// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
/// will automatically begin determining which transforms can consume it as a
/// primary input. The transforms themselves won't be applied until [process]
/// is called, however.
/// This should only be used for brand-new assets or assets that have been
/// removed and re-created. The phase will automatically handle updated assets
/// using the [AssetNode.onStateChange] stream.
void addInput(AssetNode node) {
// Each group is one channel along which an asset may be forwarded, as is
// each transformer.
var forwarder = new PhaseForwarder(
node, _classifiers.length, _groups.length);
_forwarders[] = forwarder;
if (forwarder.output != null) {
node.onStateChange.listen((state) {
if (state.isRemoved) {
for (var classifier in _classifiers.values) {
for (var group in _groups.values) {
// TODO(nweiz): If the output is available when this is called, it's
// theoretically possible for it to become unavailable between the call and
// the return. If it does so, it won't trigger the rebuilding process. To
// avoid this, we should have this and the methods it calls take explicit
// callbacks, as in [AssetNode.whenAvailable].
/// Gets the asset node for an output [id].
/// If [id] is for a generated or transformed asset, this will wait until it
/// has been created and return it. This means that the returned asset will
/// always be [AssetState.AVAILABLE].
/// If the output cannot be found, returns null.
Future<AssetNode> getOutput(AssetId id) {
return syncFuture(() {
if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
if (_outputs.containsKey(id)) {
var output = _outputs[id].output;
// If the requested output is available, we can just return it.
if (output.state.isAvailable) return output;
// If the requested output exists but isn't yet available, wait to see
// if it becomes available. If it's removed before becoming available,
// try again, since it could be generated again.
return output.whenAvailable((_) {
return output;
}).catchError((error) {
if (error is! AssetNotFoundException) throw error;
return getOutput(id);
// If this phase and the previous phases are fully declared or done, the
// requested output won't be generated and we can safely return null.
if (status != NodeStatus.RUNNING) return null;
// Otherwise, store a completer for the asset node. If it's generated in
// the future, we'll complete this completer.
var completer = _pendingOutputRequests.putIfAbsent(id,
() => new Completer.sync());
return completer.future;
/// Set this phase's transformers to [transformers].
void updateTransformers(Iterable transformers) {
var newTransformers = transformers.where((op) => op is Transformer)
var oldTransformers = _classifiers.keys.toSet();
for (var removed in oldTransformers.difference(newTransformers)) {
for (var transformer in newTransformers.difference(oldTransformers)) {
var classifier = new TransformerClassifier(
this, transformer, "$_location.$_index");
_classifiers[transformer] = classifier;
classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
for (var input in _inputs) {
var newGroups = transformers.where((op) => op is TransformerGroup)
var oldGroups = _groups.keys.toSet();
for (var removed in oldGroups.difference(newGroups)) {
for (var added in newGroups.difference(oldGroups)) {
var runner = new GroupRunner(cascade, added, "$_location.$_index");
_groups[added] = runner;
runner.onStatusChange.listen((_) => _streams.changeStatus(status));
for (var input in _inputs) {
for (var forwarder in _forwarders.values) {
forwarder.updateTransformers(_classifiers.length, _groups.length);
/// Force all [LazyTransformer]s' transforms in this phase to begin producing
/// concrete assets.
void forceAllTransforms() {
for (var classifier in _classifiers.values) {
for (var group in _groups.values) {
/// Add a new phase after this one.
/// This may only be called on a phase with no phase following it.
Phase addPhase() {
var next = new Phase._(cascade, _location, _index + 1, this);
for (var output in _outputs.values.toList()) {
// Remove [output]'s listeners because now they should get the asset from
// [next], rather than this phase. Any transforms consuming [output] will
// be re-run and will consume the output from the new final phase.
return next;
/// Mark this phase as removed.
/// This will remove all the phase's outputs.
void remove() {
for (var classifier in _classifiers.values.toList()) {
for (var group in _groups.values) {
if (_previousStatusSubscription != null) {
if (_previousOnAssetSubscription != null) {
/// Add [asset] as an output of this phase.
void _handleOutput(AssetNode asset) {
if (_inputOrigins.contains(asset.origin)) {
} else {
/// Add [asset] as an output of this phase without checking if it's a
/// forwarded asset.
void _handleOutputWithoutForwarder(AssetNode asset) {
if (_outputs.containsKey( {
} else {
_outputs[] = new PhaseOutput(this, asset, "$_location.$_index");
onDone: () => _outputs.remove(;
var exception = _outputs[].collisionException;
if (exception != null) cascade.reportError(exception);
/// Emit [asset] as an output of this phase.
/// This should be called after [_handleOutput], so that collisions are
/// resolved.
void _emit(AssetNode asset) {
/// Provide an asset to a pending [getOutput] call.
void _providePendingAsset(AssetNode asset) {
// If anyone's waiting for this asset, provide it to them.
var request = _pendingOutputRequests.remove(;
if (request == null) return;
if (asset.state.isAvailable) {
// A lazy asset may be emitted while still dirty. If so, we wait until it's
// either available or removed before trying again to access it.
asset.whenStateChanges().then((state) {
if (state.isRemoved) return getOutput(;
return asset;
String toString() => "phase $_location.$_index";