Fix bug in `ProcessWrapper` (#36)

The `done` getter listened to the stdio streams, which opened up
the possibility of multiple listeners on a single-subscription
stream.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0af09f4..917a936 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
 
+#### 3.0.8
+
+* Fixed bug in `ProcessWrapper`
+
 #### 3.0.7
 
 * Renamed `Process` to `ProcessWrapper`
diff --git a/lib/src/interface/process_wrapper.dart b/lib/src/interface/process_wrapper.dart
index ff109ba..2162637 100644
--- a/lib/src/interface/process_wrapper.dart
+++ b/lib/src/interface/process_wrapper.dart
@@ -2,18 +2,47 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
 import 'dart:io' as io;
 
 /// A wrapper around an [io.Process] class that adds some convenience methods.
 class ProcessWrapper implements io.Process {
   /// Constructs a [ProcessWrapper] object that delegates to the specified
   /// underlying object.
-  const ProcessWrapper(this.delegate);
+  ProcessWrapper(this._delegate)
+      : _stdout = new StreamController<List<int>>(),
+        _stderr = new StreamController<List<int>>(),
+        _stdoutDone = new Completer<void>(),
+        _stderrDone = new Completer<void>() {
+    _monitorStdioStream(_delegate.stdout, _stdout, _stdoutDone);
+    _monitorStdioStream(_delegate.stderr, _stderr, _stderrDone);
+  }
 
-  final io.Process delegate;
+  final io.Process _delegate;
+  final StreamController<List<int>> _stdout;
+  final StreamController<List<int>> _stderr;
+  final Completer<void> _stdoutDone;
+  final Completer<void> _stderrDone;
+
+  /// Listens to the specified [stream], repeating events on it via
+  /// [controller], and completing [completer] once the stream is done.
+  void _monitorStdioStream(
+    Stream<List<int>> stream,
+    StreamController<List<int>> controller,
+    Completer<void> completer,
+  ) {
+    stream.listen(
+      controller.add,
+      onError: controller.addError,
+      onDone: () {
+        controller.close;
+        completer.complete();
+      },
+    );
+  }
 
   @override
-  Future<int> get exitCode => delegate.exitCode;
+  Future<int> get exitCode => _delegate.exitCode;
 
   /// A [Future] that completes when the process has exited and its standard
   /// output and error streams have closed.
@@ -26,9 +55,9 @@
   Future<int> get done async {
     int result;
     await Future.wait<void>(<Future<void>>[
-      delegate.stdout.length,
-      delegate.stderr.length,
-      delegate.exitCode.then((int value) {
+      _stdoutDone.future,
+      _stderrDone.future,
+      _delegate.exitCode.then((int value) {
         result = value;
       }),
     ]);
@@ -38,18 +67,18 @@
 
   @override
   bool kill([io.ProcessSignal signal = io.ProcessSignal.sigterm]) {
-    return delegate.kill(signal);
+    return _delegate.kill(signal);
   }
 
   @override
-  int get pid => delegate.pid;
+  int get pid => _delegate.pid;
 
   @override
-  Stream<List<int>> get stderr => delegate.stderr;
+  io.IOSink get stdin => _delegate.stdin;
 
   @override
-  io.IOSink get stdin => delegate.stdin;
+  Stream<List<int>> get stdout => _stdout.stream;
 
   @override
-  Stream<List<int>> get stdout => delegate.stdout;
+  Stream<List<int>> get stderr => _stderr.stream;
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index ba8a66f..45b360d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: process
-version: 3.0.7
+version: 3.0.8
 authors:
 - Todd Volkert <tvolkert@google.com>
 - Michael Goderbauer <goderbauer@google.com>
diff --git a/test/src/interface/process_wrapper_test.dart b/test/src/interface/process_wrapper_test.dart
index b3f24ad..8799e66 100644
--- a/test/src/interface/process_wrapper_test.dart
+++ b/test/src/interface/process_wrapper_test.dart
@@ -3,6 +3,7 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:async';
+import 'dart:convert';
 import 'dart:io' as io;
 
 import 'package:process/process.dart';
@@ -10,14 +11,21 @@
 
 void main() {
   group('done', () {
-    test('completes only when all done', () async {
-      TestProcess delegate = new TestProcess();
-      ProcessWrapper process = new ProcessWrapper(delegate);
-      bool done = false;
+    TestProcess delegate;
+    ProcessWrapper process;
+    bool done;
+
+    setUp(() {
+      delegate = TestProcess();
+      process = ProcessWrapper(delegate);
+      done = false;
       // ignore: unawaited_futures
       process.done.then((int result) {
         done = true;
       });
+    });
+
+    test('completes only when all done', () async {
       expect(done, isFalse);
       delegate.exitCodeCompleter.complete(0);
       await Future<void>.value();
@@ -30,14 +38,26 @@
       expect(done, isTrue);
       expect(await process.exitCode, 0);
     });
+
+    test('works in conjunction with subscribers to stdio streams', () async {
+      process.stdout
+          .transform<String>(utf8.decoder)
+          .transform<String>(const LineSplitter())
+          .listen(print);
+      delegate.exitCodeCompleter.complete(0);
+      await delegate.stdoutController.close();
+      await delegate.stderrController.close();
+      await Future<void>.value();
+      expect(done, isTrue);
+    });
   });
 }
 
 class TestProcess implements io.Process {
   TestProcess([this.pid = 123])
-      : exitCodeCompleter = new Completer<int>(),
-        stdoutController = new StreamController<List<int>>(),
-        stderrController = new StreamController<List<int>>();
+      : exitCodeCompleter = Completer<int>(),
+        stdoutController = StreamController<List<int>>(),
+        stderrController = StreamController<List<int>>();
 
   @override
   final int pid;