Fix bug in `ProcessWrapper` (#36)
The `done` getter listened to the stdio streams, which opened up
the possibility of multiple listeners on a single-subscription
stream.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0af09f4..917a936 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
+#### 3.0.8
+
+* Fixed bug in `ProcessWrapper`
+
#### 3.0.7
* Renamed `Process` to `ProcessWrapper`
diff --git a/lib/src/interface/process_wrapper.dart b/lib/src/interface/process_wrapper.dart
index ff109ba..2162637 100644
--- a/lib/src/interface/process_wrapper.dart
+++ b/lib/src/interface/process_wrapper.dart
@@ -2,18 +2,47 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
import 'dart:io' as io;
/// A wrapper around an [io.Process] class that adds some convenience methods.
class ProcessWrapper implements io.Process {
/// Constructs a [ProcessWrapper] object that delegates to the specified
/// underlying object.
- const ProcessWrapper(this.delegate);
+ ProcessWrapper(this._delegate)
+ : _stdout = new StreamController<List<int>>(),
+ _stderr = new StreamController<List<int>>(),
+ _stdoutDone = new Completer<void>(),
+ _stderrDone = new Completer<void>() {
+ _monitorStdioStream(_delegate.stdout, _stdout, _stdoutDone);
+ _monitorStdioStream(_delegate.stderr, _stderr, _stderrDone);
+ }
- final io.Process delegate;
+ final io.Process _delegate;
+ final StreamController<List<int>> _stdout;
+ final StreamController<List<int>> _stderr;
+ final Completer<void> _stdoutDone;
+ final Completer<void> _stderrDone;
+
+ /// Listens to the specified [stream], repeating events on it via
+ /// [controller], and completing [completer] once the stream is done.
+ void _monitorStdioStream(
+ Stream<List<int>> stream,
+ StreamController<List<int>> controller,
+ Completer<void> completer,
+ ) {
+ stream.listen(
+ controller.add,
+ onError: controller.addError,
+ onDone: () {
+ controller.close;
+ completer.complete();
+ },
+ );
+ }
@override
- Future<int> get exitCode => delegate.exitCode;
+ Future<int> get exitCode => _delegate.exitCode;
/// A [Future] that completes when the process has exited and its standard
/// output and error streams have closed.
@@ -26,9 +55,9 @@
Future<int> get done async {
int result;
await Future.wait<void>(<Future<void>>[
- delegate.stdout.length,
- delegate.stderr.length,
- delegate.exitCode.then((int value) {
+ _stdoutDone.future,
+ _stderrDone.future,
+ _delegate.exitCode.then((int value) {
result = value;
}),
]);
@@ -38,18 +67,18 @@
@override
bool kill([io.ProcessSignal signal = io.ProcessSignal.sigterm]) {
- return delegate.kill(signal);
+ return _delegate.kill(signal);
}
@override
- int get pid => delegate.pid;
+ int get pid => _delegate.pid;
@override
- Stream<List<int>> get stderr => delegate.stderr;
+ io.IOSink get stdin => _delegate.stdin;
@override
- io.IOSink get stdin => delegate.stdin;
+ Stream<List<int>> get stdout => _stdout.stream;
@override
- Stream<List<int>> get stdout => delegate.stdout;
+ Stream<List<int>> get stderr => _stderr.stream;
}
diff --git a/pubspec.yaml b/pubspec.yaml
index ba8a66f..45b360d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: process
-version: 3.0.7
+version: 3.0.8
authors:
- Todd Volkert <tvolkert@google.com>
- Michael Goderbauer <goderbauer@google.com>
diff --git a/test/src/interface/process_wrapper_test.dart b/test/src/interface/process_wrapper_test.dart
index b3f24ad..8799e66 100644
--- a/test/src/interface/process_wrapper_test.dart
+++ b/test/src/interface/process_wrapper_test.dart
@@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
+import 'dart:convert';
import 'dart:io' as io;
import 'package:process/process.dart';
@@ -10,14 +11,21 @@
void main() {
group('done', () {
- test('completes only when all done', () async {
- TestProcess delegate = new TestProcess();
- ProcessWrapper process = new ProcessWrapper(delegate);
- bool done = false;
+ TestProcess delegate;
+ ProcessWrapper process;
+ bool done;
+
+ setUp(() {
+ delegate = TestProcess();
+ process = ProcessWrapper(delegate);
+ done = false;
// ignore: unawaited_futures
process.done.then((int result) {
done = true;
});
+ });
+
+ test('completes only when all done', () async {
expect(done, isFalse);
delegate.exitCodeCompleter.complete(0);
await Future<void>.value();
@@ -30,14 +38,26 @@
expect(done, isTrue);
expect(await process.exitCode, 0);
});
+
+ test('works in conjunction with subscribers to stdio streams', () async {
+ process.stdout
+ .transform<String>(utf8.decoder)
+ .transform<String>(const LineSplitter())
+ .listen(print);
+ delegate.exitCodeCompleter.complete(0);
+ await delegate.stdoutController.close();
+ await delegate.stderrController.close();
+ await Future<void>.value();
+ expect(done, isTrue);
+ });
});
}
class TestProcess implements io.Process {
TestProcess([this.pid = 123])
- : exitCodeCompleter = new Completer<int>(),
- stdoutController = new StreamController<List<int>>(),
- stderrController = new StreamController<List<int>>();
+ : exitCodeCompleter = Completer<int>(),
+ stdoutController = StreamController<List<int>>(),
+ stderrController = StreamController<List<int>>();
@override
final int pid;