blob: fd5ad614a3ec28194a2dc39aaf0872d986623ba0 [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library barback.graph.phase;
import 'dart:async';
import 'package:collection/collection.dart';
import '../asset/asset_id.dart';
import '../asset/asset_node.dart';
import '../asset/asset_node_set.dart';
import '../errors.dart';
import '../log.dart';
import '../transformer/aggregate_transformer.dart';
import '../transformer/transformer.dart';
import '../transformer/transformer_group.dart';
import '../utils/multiset.dart';
import 'asset_cascade.dart';
import 'group_runner.dart';
import 'node_status.dart';
import 'node_streams.dart';
import 'phase_forwarder.dart';
import 'phase_output.dart';
import 'transformer_classifier.dart';
/// One phase in the ordered series of transformations in an [AssetCascade].
///
/// Each phase can access outputs from previous phases and can in turn pass
/// outputs to later phases. Phases are processed strictly serially. All
/// transforms in a phase will be complete before moving on to the next phase.
/// Within a single phase, all transforms will be run in parallel.
///
/// Building can be interrupted between phases. For example, a source is added
/// which starts the background process. Sometime during, say, phase 2 (which
/// is running asynchronously) that source is modified. When the process queue
/// goes to advance to phase 3, it will see that modification and start the
/// waterfall from the beginning again.
class Phase {
/// The cascade that owns this phase.
final AssetCascade cascade;
/// A string describing the location of [this] in the transformer graph.
final String _location;
/// The index of [this] in its parent cascade or group.
final int _index;
/// The groups for this phase.
final _groups = new Map<TransformerGroup, GroupRunner>();
/// The inputs for this phase.
///
/// For the first phase, these will be the source assets. For all other
/// phases, they will be the outputs from the previous phase.
final _inputs = new AssetNodeSet();
/// The transformer classifiers for this phase.
///
/// The keys can be either [Transformer]s or [AggregateTransformer]s.
final _classifiers = new Map<dynamic, TransformerClassifier>();
/// The forwarders for this phase.
final _forwarders = new Map<AssetId, PhaseForwarder>();
/// The outputs for this phase.
final _outputs = new Map<AssetId, PhaseOutput>();
/// The set of all [AssetNode.origin] properties of the input assets for this
/// phase.
///
/// This is used to determine which assets have been passed unmodified through
/// [_classifiers] or [_groups]. It's possible that a given asset was consumed
/// by a group and not an individual transformer, and so shouldn't be
/// forwarded through the phase as a whole.
///
/// In order to detect whether an output has been forwarded through a group or
/// a classifier, we must be able to distinguish it from other outputs with
/// the same id. To do so, we check if its origin is in [_inputOrigins]. If
/// so, it's been forwarded unmodified.
final _inputOrigins = new Multiset<AssetNode>();
/// The streams exposed by this phase.
final _streams = new NodeStreams();
Stream<NodeStatus> get onStatusChange => _streams.onStatusChange;
Stream<AssetNode> get onAsset => _streams.onAsset;
Stream<LogEntry> get onLog => _streams.onLog;
/// How far along [this] is in processing its assets.
NodeStatus get status {
// Before any transformers are added, the phase should be dirty if and only
// if any input is dirty.
if (_classifiers.isEmpty && _groups.isEmpty && previous == null) {
return _inputs.any((input) => input.state.isDirty)
? NodeStatus.RUNNING
: NodeStatus.IDLE;
}
var classifierStatus = NodeStatus
.dirtiest(_classifiers.values.map((classifier) => classifier.status));
var groupStatus =
NodeStatus.dirtiest(_groups.values.map((group) => group.status));
return (previous == null ? NodeStatus.IDLE : previous.status)
.dirtier(classifierStatus)
.dirtier(groupStatus);
}
/// The previous phase in the cascade, or null if this is the first phase.
final Phase previous;
/// The subscription to [previous]'s [onStatusChange] stream.
StreamSubscription _previousStatusSubscription;
/// The subscription to [previous]'s [onAsset] stream.
StreamSubscription<AssetNode> _previousOnAssetSubscription;
final _inputSubscriptions = new Set<StreamSubscription>();
/// A map of asset ids to completers for [getInput] requests.
///
/// If an asset node is requested before it's available, we put a completer in
/// this map to wait for the asset to be generated. If it's not generated, the
/// completer should complete to `null`.
final _pendingOutputRequests = new Map<AssetId, Completer<AssetNode>>();
/// Returns all currently-available output assets for this phase.
Set<AssetNode> get availableOutputs {
return _outputs.values
.map((output) => output.output)
.where((node) => node.state.isAvailable)
.toSet();
}
// TODO(nweiz): Rather than passing the cascade and the phase everywhere,
// create an interface that just exposes [getInput]. Emit errors via
// [AssetNode]s.
Phase(AssetCascade cascade, String location) : this._(cascade, location, 0);
Phase._(this.cascade, this._location, this._index, [this.previous]) {
if (previous != null) {
_previousOnAssetSubscription = previous.onAsset.listen(addInput);
_previousStatusSubscription =
previous.onStatusChange.listen((_) => _streams.changeStatus(status));
}
onStatusChange.listen((status) {
if (status == NodeStatus.RUNNING) return;
// All the previous phases have finished declaring or producing their
// outputs. If anyone's still waiting for outputs, cut off the wait; we
// won't be generating them, at least until a source asset changes.
for (var completer in _pendingOutputRequests.values) {
completer.complete(null);
}
_pendingOutputRequests.clear();
});
}
/// Adds a new asset as an input for this phase.
///
/// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
/// will automatically begin determining which transforms can consume it as a
/// primary input. The transforms themselves won't be applied until [process]
/// is called, however.
///
/// This should only be used for brand-new assets or assets that have been
/// removed and re-created. The phase will automatically handle updated assets
/// using the [AssetNode.onStateChange] stream.
void addInput(AssetNode node) {
// Each group is one channel along which an asset may be forwarded, as is
// each transformer.
var forwarder =
new PhaseForwarder(node, _classifiers.length, _groups.length);
_forwarders[node.id] = forwarder;
forwarder.onAsset.listen(_handleOutputWithoutForwarder);
if (forwarder.output != null) {
_handleOutputWithoutForwarder(forwarder.output);
}
_inputOrigins.add(node.origin);
_inputs.add(node);
_inputSubscriptions.add(node.onStateChange.listen((state) {
if (state.isRemoved) {
_inputOrigins.remove(node.origin);
_forwarders.remove(node.id).remove();
}
_streams.changeStatus(status);
}));
for (var classifier in _classifiers.values) {
classifier.addInput(node);
}
}
// TODO(nweiz): If the output is available when this is called, it's
// theoretically possible for it to become unavailable between the call and
// the return. If it does so, it won't trigger the rebuilding process. To
// avoid this, we should have this and the methods it calls take explicit
// callbacks, as in [AssetNode.whenAvailable].
/// Gets the asset node for an output [id].
///
/// If [id] is for a generated or transformed asset, this will wait until it
/// has been created and return it. This means that the returned asset will
/// always be [AssetState.AVAILABLE].
///
/// If the output cannot be found, returns null.
Future<AssetNode> getOutput(AssetId id) {
return new Future.sync(() {
if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
if (_outputs.containsKey(id)) {
var output = _outputs[id].output;
// If the requested output is available, we can just return it.
if (output.state.isAvailable) return output;
// If the requested output exists but isn't yet available, wait to see
// if it becomes available. If it's removed before becoming available,
// try again, since it could be generated again.
output.force();
return output.whenAvailable((_) {
return output;
}).catchError((error) {
if (error is! AssetNotFoundException) throw error;
return getOutput(id);
});
}
// If this phase and the previous phases are fully declared or done, the
// requested output won't be generated and we can safely return null.
if (status != NodeStatus.RUNNING) return null;
// Otherwise, store a completer for the asset node. If it's generated in
// the future, we'll complete this completer.
var completer =
_pendingOutputRequests.putIfAbsent(id, () => new Completer.sync());
return completer.future;
});
}
/// Set this phase's transformers to [transformers].
void updateTransformers(Iterable transformers) {
var newTransformers = transformers
.where((op) => op is Transformer || op is AggregateTransformer)
.toSet();
var oldTransformers = _classifiers.keys.toSet();
for (var removed in oldTransformers.difference(newTransformers)) {
_classifiers.remove(removed).remove();
}
for (var transformer in newTransformers.difference(oldTransformers)) {
var classifier =
new TransformerClassifier(this, transformer, "$_location.$_index");
_classifiers[transformer] = classifier;
classifier.onAsset.listen(_handleOutput);
_streams.onLogPool.add(classifier.onLog);
classifier.onStatusChange.listen((_) => _streams.changeStatus(status));
for (var input in _inputs) {
classifier.addInput(input);
}
}
var newGroups = DelegatingSet.typed<TransformerGroup>(
transformers.where((op) => op is TransformerGroup).toSet());
var oldGroups = _groups.keys.toSet();
for (var removed in oldGroups.difference(newGroups)) {
_groups.remove(removed).remove();
}
for (var added in newGroups.difference(oldGroups)) {
var runner = new GroupRunner(previous, added, "$_location.$_index");
_groups[added] = runner;
runner.onAsset.listen(_handleOutput);
_streams.onLogPool.add(runner.onLog);
runner.onStatusChange.listen((_) => _streams.changeStatus(status));
}
for (var forwarder in _forwarders.values) {
forwarder.updateTransformers(_classifiers.length, _groups.length);
}
_streams.changeStatus(status);
}
/// Force all [LazyTransformer]s' transforms in this phase to begin producing
/// concrete assets.
void forceAllTransforms() {
for (var classifier in _classifiers.values) {
classifier.forceAllTransforms();
}
for (var group in _groups.values) {
group.forceAllTransforms();
}
}
/// Add a new phase after this one.
///
/// The new phase will have a location annotation describing its place in the
/// package graph. By default, this annotation will describe it as being
/// directly after [this]. If [location] is passed, though, it's described as
/// being the first phase in that location.
Phase addPhase([String location]) {
var index = 0;
if (location == null) {
location = _location;
index = _index + 1;
}
var next = new Phase._(cascade, location, index, this);
for (var output in _outputs.values.toList()) {
// Remove [output]'s listeners because now they should get the asset from
// [next], rather than this phase. Any transforms consuming [output] will
// be re-run and will consume the output from the new final phase.
output.removeListeners();
}
return next;
}
/// Mark this phase as removed.
///
/// This will remove all the phase's outputs.
void remove() {
for (var classifier in _classifiers.values.toList()) {
classifier.remove();
}
for (var group in _groups.values) {
group.remove();
}
_streams.close();
for (var subscription in _inputSubscriptions) {
subscription.cancel();
}
if (_previousStatusSubscription != null) {
_previousStatusSubscription.cancel();
}
if (_previousOnAssetSubscription != null) {
_previousOnAssetSubscription.cancel();
}
}
/// Add [asset] as an output of this phase.
void _handleOutput(AssetNode asset) {
if (_inputOrigins.contains(asset.origin)) {
_forwarders[asset.id].addIntermediateAsset(asset);
} else {
_handleOutputWithoutForwarder(asset);
}
}
/// Add [asset] as an output of this phase without checking if it's a
/// forwarded asset.
void _handleOutputWithoutForwarder(AssetNode asset) {
if (_outputs.containsKey(asset.id)) {
_outputs[asset.id].add(asset);
} else {
_outputs[asset.id] = new PhaseOutput(this, asset, "$_location.$_index");
_outputs[asset.id]
.onAsset
.listen(_emit, onDone: () => _outputs.remove(asset.id));
_emit(_outputs[asset.id].output);
}
var exception = _outputs[asset.id].collisionException;
if (exception != null) cascade.reportError(exception);
}
/// Emit [asset] as an output of this phase.
///
/// This should be called after [_handleOutput], so that collisions are
/// resolved.
void _emit(AssetNode asset) {
_streams.onAssetController.add(asset);
_providePendingAsset(asset);
}
/// Provide an asset to a pending [getOutput] call.
void _providePendingAsset(AssetNode asset) {
// If anyone's waiting for this asset, provide it to them.
var request = _pendingOutputRequests.remove(asset.id);
if (request == null) return;
if (asset.state.isAvailable) {
request.complete(asset);
return;
}
// A lazy asset may be emitted while still dirty. If so, we wait until it's
// either available or removed before trying again to access it.
assert(asset.state.isDirty);
asset.force();
asset
.whenStateChanges()
.then((state) {
if (state.isRemoved) return getOutput(asset.id);
return asset;
})
.then((asset) => request.complete(asset))
.catchError(request.completeError);
}
String toString() => "phase $_location.$_index";
}