blob: dc7b4b440421ca1c36ae8bc3a7736be42a9a503f [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library barback.group_runner;
import 'dart:async';
import 'asset_cascade.dart';
import 'asset_node.dart';
import 'barback_logger.dart';
import 'phase.dart';
import 'stream_pool.dart';
import 'transformer_group.dart';
/// A class that process all of the phases in a single transformer group.
///
/// A group takes many inputs, processes them, and emits many outputs.
class GroupRunner {
/// The phases defined by this group.
final _phases = new List<Phase>();
/// A stream that emits an event whenever this group becomes dirty and needs
/// to be run.
///
/// This may emit events when the group was already dirty or while processing
/// transforms. Events are emitted synchronously to ensure that the dirty
/// state is thoroughly propagated as soon as any assets are changed.
Stream get onDirty => _onDirtyPool.stream;
final _onDirtyPool = new StreamPool.broadcast();
/// Whether this group is dirty and needs to be run.
bool get isDirty => _phases.any((phase) => phase.isDirty);
/// A stream that emits an event whenever any transforms in this group log an
/// entry.
Stream<LogEntry> get onLog => _onLogPool.stream;
final _onLogPool = new StreamPool<LogEntry>.broadcast();
// TODO(nweiz): move to a more push-based way of propagating outputs and get
// rid of this. Once that's done, see if we can unify GroupRunner and
// AssetCascade.
/// The set of outputs that has been returned by [process].
///
/// [process] is expected to only return new outputs, so this is used to
/// ensure that it does so.
final _alreadyEmittedOutputs = new Set<AssetNode>();
GroupRunner(AssetCascade cascade, TransformerGroup group) {
var lastPhase = new Phase(cascade, group.phases.first);
_phases.add(lastPhase);
for (var phase in group.phases.skip(1)) {
lastPhase = lastPhase.addPhase(phase);
_phases.add(lastPhase);
}
for (var phase in _phases) {
_onDirtyPool.add(phase.onDirty);
_onLogPool.add(phase.onLog);
}
}
/// Adds a new asset as an input for this group.
void addInput(AssetNode node) {
_phases.first.addInput(node);
}
/// Removes this group and all sub-phases within it.
void remove() {
_phases.first.remove();
}
/// Processes this group.
///
/// Returns a future that completes with any new outputs produced by the
/// group.
Future<Set<AssetNode>> process() {
// Process the first phase that needs to do work.
for (var phase in _phases) {
var future = phase.process();
if (future != null) return future.then((_) => process());
}
// If we get here, all phases are done processing.
var newOutputs = _phases.last.availableOutputs
.difference(_alreadyEmittedOutputs);
for (var output in newOutputs) {
output.whenRemoved(() => _alreadyEmittedOutputs.remove(output));
}
_alreadyEmittedOutputs.addAll(newOutputs);
return new Future.value(newOutputs);
}
}