blob: 4b2830979e5ba63c3983b31207efce9d11352a9c [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library scheduled_test.scheduled_process;
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'scheduled_test.dart';
import 'src/utils.dart';
import 'src/value_future.dart';
/// A class representing a [Process] that is scheduled to run in the course of
/// the test. This class allows actions on the process to be scheduled
/// synchronously. All operations on this class are scheduled.
///
/// Before running the test, either [shouldExit] or [kill] must be called on
/// this to ensure that the process terminates when expected.
///
/// If the test fails, this will automatically print out any stdout and stderr
/// from the process to aid debugging.
class ScheduledProcess {
/// A description of the process. Used for error reporting.
String get description => _description;
String _description;
/// Whether a description was passed explicitly by the user.
bool _explicitDescription;
/// The encoding used for the process's input and output streams.
final Encoding _encoding;
/// The process that's scheduled to run.
ValueFuture<Process> _process;
/// A fork of [_stdout] that records the standard output of the process. Used
/// for debugging information.
Stream<String> _stdoutLog;
/// A line-by-line view of the standard output stream of the process.
StreamIterator<String> _stdout;
/// A canceller that controls both [_stdout] and [_stdoutLog].
StreamCanceller _stdoutCanceller;
/// A fork of [_stderr] that records the standard error of the process. Used
/// for debugging information.
Stream<String> _stderrLog;
/// A line-by-line view of the standard error stream of the process.
StreamIterator<String> _stderr;
/// A canceller that controls both [_stderr] and [_stderrLog].
StreamCanceller _stderrCanceller;
/// The exit code of the process that's scheduled to run. This will naturally
/// only complete once the process has terminated.
ValueFuture<int> _exitCode;
/// Whether the user has scheduled the end of this process by calling either
/// [shouldExit] or [kill].
var _endScheduled = false;
/// The task that runs immediately before this process is scheduled to end. If
/// the process ends during this task, we treat that as expected.
Task _taskBeforeEnd;
/// Whether the process is expected to terminate at this point.
var _endExpected = false;
/// Schedules a process to start. [executable], [arguments],
/// [workingDirectory], and [environment] have the same meaning as for
/// [Process.start]. [description] is a string description of this process; it
/// defaults to the command-line invocation. [encoding] is the [Encoding] that
/// will be used for the process's input and output.
///
/// [executable], [arguments], [workingDirectory], and [environment] may be
/// either a [Future] or a concrete value. If any are [Future]s, the process
/// won't start until the [Future]s have completed. In addition, [arguments]
/// may be a [List] containing a mix of strings and [Future]s.
ScheduledProcess.start(executable, arguments,
{workingDirectory, environment, String description,
Encoding encoding: UTF8})
: _encoding = encoding,
_explicitDescription = description != null,
_description = description {
assert(currentSchedule.state == ScheduleState.SET_UP);
_updateDescription(executable, arguments);
_scheduleStartProcess(executable, arguments, workingDirectory, environment);
_scheduleExceptionCleanup();
var stdoutWithCanceller = _lineStreamWithCanceller(
_process.then((p) => p.stdout));
_stdoutCanceller = stdoutWithCanceller.last;
_stdoutLog = stdoutWithCanceller.first;
var stderrWithCanceller = _lineStreamWithCanceller(
_process.then((p) => p.stderr));
_stderrCanceller = stderrWithCanceller.last;
_stderrLog = stderrWithCanceller.first;
_stdout = new StreamIterator<String>(stdoutStream());
_stderr = new StreamIterator<String>(stderrStream());
}
/// Updates [_description] to reflect [executable] and [arguments], which are
/// the same values as in [start].
void _updateDescription(executable, arguments) {
if (_explicitDescription) return;
if (executable is Future) {
_description = "future process";
} else if (arguments is Future || arguments.any((e) => e is Future)) {
_description = executable;
} else {
_description = "$executable ${arguments.map((a) => '"$a"').join(' ')}";
}
}
/// Schedules the process to start and sets [_process].
void _scheduleStartProcess(executable,
arguments,
workingDirectory,
environment) {
var exitCodeCompleter = new Completer();
_exitCode = new ValueFuture(exitCodeCompleter.future);
_process = new ValueFuture(schedule(() {
if (!_endScheduled) {
throw new StateError("Scheduled process '$description' must "
"have shouldExit() or kill() called before the test is run.");
}
_handleExit(exitCodeCompleter);
return Future.wait([
new Future.sync(() => executable),
awaitObject(arguments),
new Future.sync(() => workingDirectory),
new Future.sync(() => environment)
]).then((results) {
executable = results[0];
arguments = results[1];
workingDirectory = results[2];
environment = results[3];
_updateDescription(executable, arguments);
return Process.start(executable,
arguments,
workingDirectory: workingDirectory,
environment: environment).then((process) {
process.stdin.encoding = UTF8;
return process;
});
});
}, "starting process '$description'"));
}
/// Listens for [_process] to exit and passes the exit code to
/// [exitCodeCompleter]. If the process completes earlier than expected, an
/// exception will be signaled to the schedule.
void _handleExit(Completer exitCodeCompleter) {
// We purposefully avoid using wrapFuture here. If an error occurs while a
// process is running, we want the schedule to move to the onException
// queue where the process will be killed, rather than blocking the tasks
// queue waiting for the process to exit.
_process.then((p) => p.exitCode).then((exitCode) {
if (_endExpected) {
exitCodeCompleter.complete(exitCode);
return;
}
wrapFuture(pumpEventQueue().then((_) {
if (currentSchedule.currentTask != _taskBeforeEnd) return;
// If we're one task before the end was scheduled, wait for that task
// to complete and pump the event queue so that _endExpected will be
// set.
return _taskBeforeEnd.result.then((_) => pumpEventQueue());
}).then((_) {
exitCodeCompleter.complete(exitCode);
if (!_endExpected) {
fail("Process '$description' ended earlier than scheduled "
"with exit code $exitCode.");
}
}), "waiting to reach shouldExit() or kill() for process "
"'$description'");
});
}
/// Converts a stream of bytes to a stream of lines and returns that along
/// with a [StreamCanceller] controlling it.
Pair<Stream<String>, StreamCanceller> _lineStreamWithCanceller(
Future<Stream<List<int>>> streamFuture) {
return streamWithCanceller(futureStream(streamFuture)
.handleError((e) => currentSchedule.signalError(e))
.map((chunk) {
// Whenever the process produces any sort of output, reset the schedule's
// timer.
currentSchedule.heartbeat();
return chunk;
})
.transform(_encoding.decoder)
.transform(new LineSplitter()));
}
/// Schedule an exception handler that will clean up the process and provide
/// debug information if an error occurs.
void _scheduleExceptionCleanup() {
currentSchedule.onException.schedule(() {
_stdoutCanceller();
_stderrCanceller();
if (!_process.hasValue) return;
var killedPrematurely = false;
if (!_exitCode.hasValue) {
killedPrematurely = true;
_endExpected = true;
_process.value.kill();
// Ensure that the onException queue waits for the process to actually
// exit after being killed.
wrapFuture(_process.value.exitCode, "waiting for process "
"'$description' to die");
}
return Future.wait([
_stdoutLog.toList(),
_stderrLog.toList()
]).then((results) {
var stdout = results[0].join("\n");
var stderr = results[1].join("\n");
var exitDescription = killedPrematurely
? "Process was killed prematurely."
: "Process exited with exit code ${_exitCode.value}.";
currentSchedule.addDebugInfo(
"Results of running '$description':\n"
"$exitDescription\n"
"Standard output:\n"
"${prefixLines(stdout)}\n"
"Standard error:\n"
"${prefixLines(stderr)}");
});
}, "cleaning up process '$description'");
}
/// Reads the next line of stdout from the process.
Future<String> nextLine() => schedule(() => streamIteratorFirst(_stdout),
"reading the next stdout line from process '$description'");
/// Reads the next line of stderr from the process.
Future<String> nextErrLine() => schedule(() => streamIteratorFirst(_stderr),
"reading the next stderr line from process '$description'");
/// Reads the remaining stdout from the process. This should only be called
/// after kill() or shouldExit().
Future<String> remainingStdout() {
if (!_endScheduled) {
throw new StateError("remainingStdout() should only be called after "
"kill() or shouldExit().");
}
return schedule(() => concatRest(_stdout),
"reading the remaining stdout from process '$description'");
}
/// Reads the remaining stderr from the process. This should only be called
/// after kill() or shouldExit().
Future<String> remainingStderr() {
if (!_endScheduled) {
throw new StateError("remainingStderr() should only be called after "
"kill() or shouldExit().");
}
return schedule(() => concatRest(_stderr),
"reading the remaining stderr from process '$description'");
}
/// Returns a stream that will emit anything the process emits via the
/// process's standard output from now on.
///
/// This stream will be independent from any other methods that deal with
/// standard output, including other calls to [stdoutStream].
///
/// This can be overridden by subclasses to return a derived standard output
/// stream. This stream will then be used for [nextLine], [nextErrLine],
/// [remainingStdout], and [remainingStderr].
Stream<String> stdoutStream() {
var pair = tee(_stdoutLog);
_stdoutLog = pair.first;
return pair.last;
}
/// Returns a stream that will emit anything the process emits via the
/// process's standard error from now on.
///
/// This stream will be independent from any other methods that deal with
/// standard error, including other calls to [stderrStream].
Stream<String> stderrStream() {
var pair = tee(_stderrLog);
_stderrLog = pair.first;
return pair.last;
}
/// Writes [line] to the process as stdin.
void writeLine(String line) {
schedule(() {
return _process.then((p) => p.stdin.writeln('$line'));
}, "writing '$line' to stdin for process '$description'");
}
/// Closes the process's stdin stream.
void closeStdin() {
schedule(() => _process.then((p) => p.stdin.close()),
"closing stdin for process '$description'");
}
/// Kills the process, and waits until it's dead.
void kill() {
if (_endScheduled) {
throw new StateError("shouldExit() or kill() already called.");
}
_endScheduled = true;
_taskBeforeEnd = currentSchedule.tasks.contents.last;
schedule(() {
_endExpected = true;
return _process.then((p) => p.kill()).then((_) => _exitCode);
}, "waiting for process '$description' to die");
}
/// Waits for the process to exit, and verifies that the exit code matches
/// [expectedExitCode] (if given).
void shouldExit([int expectedExitCode]) {
if (_endScheduled) {
throw new StateError("shouldExit() or kill() already called.");
}
_endScheduled = true;
_taskBeforeEnd = currentSchedule.tasks.contents.last;
schedule(() {
_endExpected = true;
return _exitCode.then((exitCode) {
if (expectedExitCode != null) {
expect(exitCode, equals(expectedExitCode));
}
});
}, "waiting for process '$description' to exit");
}
}