blob: 8680e6017b0e69a33c52469b1f00d0039b4e0067 [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library barback.phase;
import 'dart:async';
import 'dart:collection';
import 'asset_cascade.dart';
import 'asset_id.dart';
import 'asset_node.dart';
import 'asset_set.dart';
import 'barback_logger.dart';
import 'group_runner.dart';
import 'errors.dart';
import 'multiset.dart';
import 'phase_forwarder.dart';
import 'phase_input.dart';
import 'phase_output.dart';
import 'stream_pool.dart';
import 'transformer.dart';
import 'transformer_group.dart';
import 'utils.dart';
/// One phase in the ordered series of transformations in an [AssetCascade].
///
/// Each phase can access outputs from previous phases and can in turn pass
/// outputs to later phases. Phases are processed strictly serially. All
/// transforms in a phase will be complete before moving on to the next phase.
/// Within a single phase, all transforms will be run in parallel.
///
/// Building can be interrupted between phases. For example, a source is added
/// which starts the background process. Sometime during, say, phase 2 (which
/// is running asynchronously) that source is modified. When the process queue
/// goes to advance to phase 3, it will see that modification and start the
/// waterfall from the beginning again.
class Phase {
/// The cascade that owns this phase.
final AssetCascade cascade;
/// The transformers that can access [inputs].
///
/// Their outputs will be available to the next phase.
final Set<Transformer> _transformers;
/// The groups for this phase.
final _groups = new Map<TransformerGroup, GroupRunner>();
/// The inputs for this phase.
///
/// For the first phase, these will be the source assets. For all other
/// phases, they will be the outputs from the previous phase.
final _inputs = new Map<AssetId, PhaseInput>();
/// The forwarders for this phase.
final _forwarders = new Map<AssetId, PhaseForwarder>();
/// The outputs for this phase.
final _outputs = new Map<AssetId, PhaseOutput>();
/// The set of all [AssetNode.origin] properties of the input assets for this
/// phase.
///
/// This is used to determine which assets have been passed unmodified through
/// [_inputs] or [_groups]. Each input asset has a PhaseInput in [_inputs]. If
/// that input isn't consumed by any transformers, it will be forwarded
/// through the PhaseInput. However, it's possible that it was consumed by a
/// group, and so shouldn't be forwarded through the phase as a whole.
///
/// In order to detect whether an output has been forwarded through a group or
/// a PhaseInput, we must be able to distinguish it from other outputs with
/// the same id. To do so, we check if its origin is in [_inputOrigins]. If
/// so, it's been forwarded unmodified.
final _inputOrigins = new Multiset<AssetNode>();
/// A stream that emits an event whenever this phase becomes dirty and needs
/// to be run.
///
/// This may emit events when the phase was already dirty or while processing
/// transforms. Events are emitted synchronously to ensure that the dirty
/// state is thoroughly propagated as soon as any assets are changed.
Stream get onDirty => _onDirtyPool.stream;
final _onDirtyPool = new StreamPool.broadcast();
/// A controller whose stream feeds into [_onDirtyPool].
///
/// This is used whenever an input is added or transforms are changed.
final _onDirtyController = new StreamController.broadcast(sync: true);
/// Whether this phase is dirty and needs to be run.
bool get isDirty => _inputs.values.any((input) => input.isDirty) ||
_groups.values.any((group) => group.isDirty);
/// A stream that emits an event whenever any transforms in this phase log an
/// entry.
Stream<LogEntry> get onLog => _onLogPool.stream;
final _onLogPool = new StreamPool<LogEntry>.broadcast();
/// The phase after this one.
///
/// Outputs from this phase will be passed to it.
Phase get next => _next;
Phase _next;
/// Returns all currently-available output assets for this phase.
Set<AssetNode> get availableOutputs {
return _outputs.values
.map((output) => output.output)
.where((node) => node.state.isAvailable)
.toSet();
}
// TODO(nweiz): Rather than passing the cascade and the phase everywhere,
// create an interface that just exposes [getInput]. Emit errors via
// [AssetNode]s.
Phase(this.cascade, Iterable transformers)
: _transformers = transformers.where((op) => op is Transformer).toSet() {
_onDirtyPool.add(_onDirtyController.stream);
for (var group in transformers.where((op) => op is TransformerGroup)) {
_groups[group] = new GroupRunner(cascade, group);
_onDirtyPool.add(_groups[group].onDirty);
}
}
/// Adds a new asset as an input for this phase.
///
/// [node] doesn't have to be [AssetState.AVAILABLE]. Once it is, the phase
/// will automatically begin determining which transforms can consume it as a
/// primary input. The transforms themselves won't be applied until [process]
/// is called, however.
///
/// This should only be used for brand-new assets or assets that have been
/// removed and re-created. The phase will automatically handle updated assets
/// using the [AssetNode.onStateChange] stream.
void addInput(AssetNode node) {
if (_inputs.containsKey(node.id)) _inputs[node.id].remove();
// Each group is one channel along which an asset may be forwarded. Then
// there's one additional channel for the non-grouped transformers.
var forwarder = new PhaseForwarder(_groups.length + 1);
_forwarders[node.id] = forwarder;
forwarder.onForwarding.listen((asset) {
_addOutput(asset);
var exception = _outputs[asset.id].collisionException;
if (exception != null) cascade.reportError(exception);
});
_inputOrigins.add(node.origin);
var input = new PhaseInput(this, node, _transformers);
_inputs[node.id] = input;
input.input.whenRemoved(() {
_inputOrigins.remove(node.origin);
_inputs.remove(node.id);
_forwarders.remove(node.id).remove();
});
_onDirtyPool.add(input.onDirty);
_onDirtyController.add(null);
_onLogPool.add(input.onLog);
for (var group in _groups.values) {
group.addInput(node);
}
}
/// Gets the asset node for an input [id].
///
/// If an input with that ID cannot be found, returns null.
Future<AssetNode> getInput(AssetId id) {
return newFuture(() {
if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
if (_inputs.containsKey(id)) return _inputs[id].input;
return null;
});
}
/// Gets the asset node for an output [id].
///
/// If an output with that ID cannot be found, returns null.
Future<AssetNode> getOutput(AssetId id) {
return newFuture(() {
if (id.package != cascade.package) return cascade.graph.getAssetNode(id);
if (!_outputs.containsKey(id)) return null;
return _outputs[id].output;
});
}
/// Set this phase's transformers to [transformers].
void updateTransformers(Iterable transformers) {
_onDirtyController.add(null);
var actualTransformers = transformers.where((op) => op is Transformer);
_transformers.clear();
_transformers.addAll(actualTransformers);
for (var input in _inputs.values) {
input.updateTransformers(actualTransformers);
}
var newGroups = transformers.where((op) => op is TransformerGroup)
.toSet();
var oldGroups = _groups.keys.toSet();
for (var removed in oldGroups.difference(newGroups)) {
_groups.remove(removed).remove();
}
for (var added in newGroups.difference(oldGroups)) {
var runner = new GroupRunner(cascade, added);
_groups[added] = runner;
_onDirtyPool.add(runner.onDirty);
for (var input in _inputs.values) {
runner.addInput(input.input);
}
}
for (var forwarder in _forwarders.values) {
forwarder.numChannels = _groups.length + 1;
}
}
/// Add a new phase after this one with [transformers].
///
/// This may only be called on a phase with no phase following it.
Phase addPhase(Iterable transformers) {
assert(_next == null);
_next = new Phase(cascade, transformers);
for (var output in _outputs.values.toList()) {
// Remove [output]'s listeners because now they should get the asset from
// [_next], rather than this phase. Any transforms consuming [output] will
// be re-run and will consume the output from the new final phase.
output.removeListeners();
}
return _next;
}
/// Mark this phase as removed.
///
/// This will remove all the phase's outputs and all following phases.
void remove() {
removeFollowing();
for (var input in _inputs.values.toList()) {
input.remove();
}
for (var group in _groups.values) {
group.remove();
}
_onDirtyPool.close();
_onLogPool.close();
}
/// Remove all phases after this one.
void removeFollowing() {
if (_next == null) return;
_next.remove();
_next = null;
}
/// Processes this phase.
///
/// Returns a future that completes when processing is done. If there is
/// nothing to process, returns `null`.
Future process() {
if (!isDirty) return null;
var outputIds = new Set<AssetId>();
void _handleOutputs(Set<AssetNode> outputs) {
for (var asset in outputs) {
if (_inputOrigins.contains(asset.origin)) {
_forwarders[asset.id].addIntermediateAsset(asset);
continue;
}
outputIds.add(asset.id);
_addOutput(asset);
}
}
var outputFutures = [];
outputFutures.addAll(_inputs.values.map((input) {
if (!input.isDirty) return new Future.value(new Set());
return input.process().then(_handleOutputs);
}));
outputFutures.addAll(_groups.values.map((group) {
if (!group.isDirty) return new Future.value(new Set());
return group.process().then(_handleOutputs);
}));
return Future.wait(outputFutures).then((_) {
// Report collisions in a deterministic order.
outputIds = outputIds.toList();
outputIds.sort((a, b) => a.compareTo(b));
for (var id in outputIds) {
// It's possible the output was removed before other transforms in this
// phase finished.
if (!_outputs.containsKey(id)) continue;
var exception = _outputs[id].collisionException;
if (exception != null) cascade.reportError(exception);
}
});
}
/// Add [asset] as an output of this phase.
void _addOutput(AssetNode asset) {
if (_outputs.containsKey(asset.id)) {
_outputs[asset.id].add(asset);
} else {
_outputs[asset.id] = new PhaseOutput(this, asset);
_outputs[asset.id].onAsset.listen((output) {
if (_next != null) _next.addInput(output);
}, onDone: () => _outputs.remove(asset.id));
if (_next != null) _next.addInput(_outputs[asset.id].output);
}
}
}