blob: 8b9cd9202c591e3eb4314813a058ec5d892eff78 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io' as io
show
IOSink,
Process,
ProcessResult,
ProcessSignal,
ProcessStartMode,
SYSTEM_ENCODING;
import 'package:file/file.dart';
import 'package:intl/intl.dart';
import 'package:path/path.dart' as path;
import '../interface/process_manager.dart';
import 'can_run_manifest_entry.dart';
import 'common.dart';
import 'constants.dart';
import 'manifest.dart';
import 'replay_process_manager.dart';
import 'run_manifest_entry.dart';
/// Records process invocation activity and serializes it to disk.
///
/// A `RecordingProcessManager` decorates another `ProcessManager` instance by
/// recording all process invocation activity (including the stdout and stderr
/// of the associated processes) before delegating to the underlying manager.
///
/// This class enables "record / replay" tests, where you record the process
/// invocation activity during a real program run, serialize the activity to
/// disk, then fake all invocation activity during tests by replaying the
/// serialized recording.
///
/// See also:
///
/// * [ReplayProcessManager].
class RecordingProcessManager implements ProcessManager {
static const List<String> _kSkippableExecutables = const <String>[
'env',
'xcrun',
];
/// The manager to which this manager delegates.
final ProcessManager delegate;
/// The directory to which serialized invocation metadata will be written.
final Directory destination;
/// List of invocation metadata. Will be serialized as [kManifestName].
final Manifest _manifest = new Manifest();
/// Maps process IDs of running processes to exit code futures.
final Map<int, Future<int>> _runningProcesses = <int, Future<int>>{};
/// Constructs a new `RecordingProcessManager`.
///
/// This manager will record all process invocations and serialize them to
/// the specified [destination]. The underlying `ProcessManager` functionality
/// will be delegated to [delegate].
///
/// If [destination] does not already exist, or if it exists and is not empty,
/// a [StateError] will be thrown.
///
/// [destination] should be treated as opaque. Its contents are intended to
/// be consumed only by [ReplayProcessManager] and are subject to change
/// between versions of `package:process`.
RecordingProcessManager(this.delegate, this.destination) {
if (!destination.existsSync() || destination.listSync().isNotEmpty) {
throw new StateError('Cannot record to ${destination.path}');
}
}
/// The file system in which this manager will create recording files.
FileSystem get fs => destination.fileSystem;
@override
Future<io.Process> start(
List<dynamic> command, {
String workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment: true,
bool runInShell: false,
io.ProcessStartMode mode: io.ProcessStartMode.NORMAL,
}) async {
io.Process process = await delegate.start(
command,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell,
mode: mode,
);
List<String> sanitizedCommand = sanitize(command);
String basename = _getBasename(process.pid, sanitizedCommand);
RunManifestEntry entry = new RunManifestEntry(
pid: process.pid,
basename: basename,
command: sanitizedCommand,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell,
mode: mode,
);
_manifest.add(entry);
_RecordingProcess result = new _RecordingProcess(
manager: this,
basename: basename,
delegate: process,
);
await result.startRecording();
_runningProcesses[process.pid] = result.exitCode.then((int exitCode) {
_runningProcesses.remove(process.pid);
entry.exitCode = exitCode;
});
return result;
}
@override
Future<io.ProcessResult> run(
List<dynamic> command, {
String workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment: true,
bool runInShell: false,
Encoding stdoutEncoding: io.SYSTEM_ENCODING,
Encoding stderrEncoding: io.SYSTEM_ENCODING,
}) async {
io.ProcessResult result = await delegate.run(
command,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell,
stdoutEncoding: stdoutEncoding,
stderrEncoding: stderrEncoding,
);
List<String> sanitizedCommand = sanitize(command);
String basename = _getBasename(result.pid, sanitizedCommand);
_manifest.add(new RunManifestEntry(
pid: result.pid,
basename: basename,
command: sanitizedCommand,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell,
stdoutEncoding: stdoutEncoding,
stderrEncoding: stderrEncoding,
exitCode: result.exitCode,
));
await _recordData(result.stdout, stdoutEncoding, '$basename.stdout');
await _recordData(result.stderr, stderrEncoding, '$basename.stderr');
return result;
}
Future<Null> _recordData(
dynamic data, Encoding encoding, String basename) async {
File file = fs.file('${destination.path}/$basename');
IOSink recording = file.openWrite(encoding: encoding);
try {
if (encoding == null)
recording.add(data);
else
recording.write(data);
await recording.flush();
} finally {
await recording.close();
}
}
@override
io.ProcessResult runSync(
List<dynamic> command, {
String workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment: true,
bool runInShell: false,
Encoding stdoutEncoding: io.SYSTEM_ENCODING,
Encoding stderrEncoding: io.SYSTEM_ENCODING,
}) {
io.ProcessResult result = delegate.runSync(
command,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell,
stdoutEncoding: stdoutEncoding,
stderrEncoding: stderrEncoding,
);
List<String> sanitizedCommand = sanitize(command);
String basename = _getBasename(result.pid, sanitizedCommand);
_manifest.add(new RunManifestEntry(
pid: result.pid,
basename: basename,
command: sanitizedCommand,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell,
stdoutEncoding: stdoutEncoding,
stderrEncoding: stderrEncoding,
exitCode: result.exitCode,
));
_recordDataSync(result.stdout, stdoutEncoding, '$basename.stdout');
_recordDataSync(result.stderr, stderrEncoding, '$basename.stderr');
return result;
}
void _recordDataSync(dynamic data, Encoding encoding, String basename) {
File file = fs.file('${destination.path}/$basename');
if (encoding == null)
file.writeAsBytesSync(data, flush: true);
else
file.writeAsStringSync(data, encoding: encoding, flush: true);
}
@override
bool canRun(dynamic executable, {String workingDirectory}) {
bool result =
delegate.canRun(executable, workingDirectory: workingDirectory);
_manifest.add(new CanRunManifestEntry(
executable: executable.toString(), result: result));
return result;
}
@override
bool killPid(int pid, [io.ProcessSignal signal = io.ProcessSignal.SIGTERM]) {
return delegate.killPid(pid, signal);
}
/// Returns a human-readable identifier for the specified executable.
String _getBasename(int pid, List<String> sanitizedCommand) {
String index = new NumberFormat('000').format(_manifest.length);
String identifier = 'executable';
for (String element in sanitizedCommand) {
if (element.startsWith('-')) {
// Ignore flags.
continue;
}
identifier = path.basename(element);
if (!_kSkippableExecutables.contains(identifier)) {
break;
}
}
return '$index.$identifier.$pid';
}
/// Flushes pending data to [destination].
///
/// This manager may buffer invocation metadata in memory as it sees fit.
/// Calling `flush` will force the manager to write any pending data to disk.
/// This returns a future that completes when all pending data has been
/// written to disk.
///
/// Failure to call this method before the current process exits will likely
/// cause invocation data to be lost.
///
/// If [finishRunningProcesses] is true, the returned future will only
/// complete after all running processes have exited, thus guaranteeing that
/// no new invocation data will be generated until new processes are invoked.
/// Any processes that don't exit on their own within the specified [timeout]
/// will be marked as daemon processes in the serialized metadata and will be
/// signalled with `SIGTERM`. If such processes *still* don't exit within the
/// specified [timeout] after being signalled, they'll be marked as not
/// responding in the serialized metadata.
///
/// If [finishRunningProcesses] is false (the default), then [timeout] is
/// ignored.
Future<Null> flush({
bool finishRunningProcesses: false,
Duration timeout: const Duration(milliseconds: 20),
}) async {
if (finishRunningProcesses) {
await _waitForRunningProcessesToExit(timeout);
}
await _writeManifestToDisk();
}
/// Waits for all running processes to exit, and records their exit codes in
/// the process manifest. Any process that doesn't exit within [timeout]
/// will be marked as a [RunManifestEntry.daemon] and be signalled with
/// `SIGTERM`. If such processes *still* don't exit within [timeout] after
/// being signalled, they'll be marked as [RunManifestEntry.notResponding].
Future<Null> _waitForRunningProcessesToExit(Duration timeout) async {
await _waitForRunningProcessesToExitWithTimeout(
timeout: timeout,
onTimeout: (RunManifestEntry entry) {
entry.daemon = true;
delegate.killPid(entry.pid);
});
// Now that we explicitly signalled the processes that timed out asking
// them to shutdown, wait one more time for those processes to exit.
await _waitForRunningProcessesToExitWithTimeout(
timeout: timeout,
onTimeout: (RunManifestEntry entry) {
entry.notResponding = true;
});
}
Future<Null> _waitForRunningProcessesToExitWithTimeout({
Duration timeout,
void onTimeout(RunManifestEntry entry),
}) async {
void callOnTimeout(int pid) => onTimeout(_manifest.getRunEntry(pid));
await Future
.wait(new List<Future<int>>.from(_runningProcesses.values))
.timeout(timeout, onTimeout: () {
_runningProcesses.keys.forEach(callOnTimeout);
});
}
/// Writes our process invocation manifest to disk in the destination folder.
Future<Null> _writeManifestToDisk() async {
File manifestFile = fs.file('${destination.path}/$kManifestName');
await manifestFile.writeAsString(_manifest.toJson(), flush: true);
}
}
/// A [Process] implementation that records `stdout` and `stderr` stream events
/// to disk before forwarding them on to the underlying stream listener.
class _RecordingProcess implements io.Process {
final io.Process delegate;
final String basename;
final RecordingProcessManager manager;
// ignore: close_sinks
final StreamController<List<int>> _stdout = new StreamController<List<int>>();
// ignore: close_sinks
final StreamController<List<int>> _stderr = new StreamController<List<int>>();
bool _started = false;
_RecordingProcess({this.manager, this.basename, this.delegate});
Future<Null> startRecording() async {
assert(!_started);
_started = true;
await Future.wait(<Future<Null>>[
_recordStream(delegate.stdout, _stdout, 'stdout'),
_recordStream(delegate.stderr, _stderr, 'stderr'),
]);
}
Future<Null> _recordStream(
Stream<List<int>> stream,
StreamController<List<int>> controller,
String suffix,
) async {
String path = '${manager.destination.path}/$basename.$suffix';
File file = await manager.fs.file(path).create();
RandomAccessFile recording = await file.open(mode: FileMode.WRITE);
stream.listen(
(List<int> data) {
// Write synchronously to guarantee that the order of data
// within our recording is preserved across stream notifications.
recording.writeFromSync(data);
// Flush immediately so that if the program crashes, forensic
// data from the recording won't be lost.
recording.flushSync();
controller.add(data);
},
onError: (dynamic error, StackTrace stackTrace) {
recording.closeSync();
controller.addError(error, stackTrace);
},
onDone: () {
recording.closeSync();
controller.close();
},
);
}
@override
Future<int> get exitCode => delegate.exitCode;
// TODO(tvolkert): Remove this once dart-lang/sdk@e5a16b1 lands in stable SDK.
@override // ignore: OVERRIDE_ON_NON_OVERRIDING_SETTER
set exitCode(Future<int> exitCode) =>
throw new UnsupportedError('set exitCode');
@override
Stream<List<int>> get stdout {
assert(_started);
return _stdout.stream;
}
@override
Stream<List<int>> get stderr {
assert(_started);
return _stderr.stream;
}
@override
io.IOSink get stdin {
// We don't currently support recording `stdin`.
return delegate.stdin;
}
@override
int get pid => delegate.pid;
@override
bool kill([io.ProcessSignal signal = io.ProcessSignal.SIGTERM]) =>
delegate.kill(signal);
}