Add cancel method to `BazelWorkerConnection` and use it (#16)

Also update pubspec/changelog for release
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 77cd606..b8b0031 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 0.1.8
+
+* Add `Future cancel()` method to `DriverConnection`, which in the case of a
+  `StdDriverConnection` closes the input stream.
+  * The `terminateWorkers` method on `BazelWorkerDriver` now calls `cancel` on
+    all worker connections to ensure the vm can exit correctly.
+
 ## 0.1.7
 
 * Update the `BazelWorkerDriver` class to handle worker crashes, and retry work
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 93caf49..3044bbe 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -19,6 +19,9 @@
   /// The input stream.
   final StreamQueue<List<int>> _inputQueue;
 
+  // Whether or not the input queue has already been cancelled.
+  bool _inputQueueCancelled = false;
+
   /// The current buffer.
   final Queue<int> _buffer = new Queue<int>();
 
@@ -38,7 +41,7 @@
       }
 
       // If there is nothing left in the queue then cancel the subscription.
-      if (message == null) _inputQueue.cancel();
+      if (message == null) _cancel();
 
       return message;
     } catch (e) {
@@ -49,6 +52,14 @@
     }
   }
 
+  Future _cancel() {
+    if (!_inputQueueCancelled) {
+      _inputQueueCancelled = true;
+      return _inputQueue.cancel();
+    }
+    return new Future.value(null);
+  }
+
   /// Stop listening to the stream for further updates.
-  Future cancel() => _inputQueue.cancel();
+  Future cancel() => _cancel();
 }
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 78bec3b..80d82a9 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -59,11 +59,11 @@
 
   /// Calls `kill` on all worker processes.
   Future terminateWorkers() async {
-    for (var worker in _readyWorkers) {
-      worker.kill();
+    for (var worker in _readyWorkers.toList()) {
+      _killWorker(worker);
     }
     await Future.wait(_spawningWorkers.map((worker) async {
-      (await worker).kill();
+      _killWorker(await worker);
     }));
   }
 
@@ -108,7 +108,6 @@
         worker.exitCode.then((exitCode) {
           _idleWorkers.remove(worker);
           _readyWorkers.remove(worker);
-          _spawningWorkers.remove(worker);
           _runWorkQueue();
         });
       });
@@ -178,8 +177,7 @@
       // Note that whenever we spawn a worker we listen for its exit code
       // and clean it up so we don't need to do that here.
       var worker = _idleWorkers.removeLast();
-      _readyWorkers.remove(worker);
-      worker.kill();
+      _killWorker(worker);
     }
   }
 
@@ -194,6 +192,13 @@
     _runWorkQueue();
     return true;
   }
+
+  void _killWorker(Process worker) {
+    _workerConnections[worker].cancel();
+    _readyWorkers.remove(worker);
+    _idleWorkers.remove(worker);
+    worker.kill();
+  }
 }
 
 /// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index b170586..29a00a1 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -20,6 +20,8 @@
   Future<WorkResponse> readResponse();
 
   void writeRequest(WorkRequest request);
+
+  Future cancel();
 }
 
 /// Default implementation of [DriverConnection] that works with [Stdin]
@@ -72,4 +74,7 @@
   void writeRequest(WorkRequest request) {
     _outputStream.add(protoToDelimitedBuffer(request));
   }
+
+  @override
+  Future cancel() => _messageGrouper.cancel();
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 397ee9b..d2007e5 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: bazel_worker
-version: 0.1.7
+version: 0.1.8
 description: Tools for creating a bazel persistent worker.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/bazel_worker