added shared interfaces, and a few other tweaks
diff --git a/lib/bazel_worker.dart b/lib/bazel_worker.dart
index e85cd94..8d42fb6 100644
--- a/lib/bazel_worker.dart
+++ b/lib/bazel_worker.dart
@@ -4,5 +4,8 @@
 
 export 'src/async_worker_loop.dart';
 export 'src/constants.dart';
+export 'src/message_grouper.dart';
 export 'src/sync_worker_loop.dart';
+export 'src/worker_connection.dart';
+export 'src/worker_loop.dart';
 export 'src/worker_protocol.pb.dart';
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 9c452fd..a765d55 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -7,11 +7,12 @@
 
 import 'package:async/async.dart';
 
+import 'message_grouper.dart';
 import 'message_grouper_state.dart';
 
 /// Collects stream data into messages by interpreting it as
 /// base-128 encoded lengths interleaved with raw data.
-class AsyncMessageGrouper {
+class AsyncMessageGrouper implements MessageGrouper {
   /// Current state for reading in messages;
   final _state = new MessageGrouperState();
 
diff --git a/lib/src/async_worker_loop.dart b/lib/src/async_worker_loop.dart
index 7e774f9..4583ba2 100644
--- a/lib/src/async_worker_loop.dart
+++ b/lib/src/async_worker_loop.dart
@@ -8,10 +8,12 @@
 import 'constants.dart';
 import 'async_message_grouper.dart';
 import 'utils.dart';
+import 'worker_connection.dart';
+import 'worker_loop.dart';
 import 'worker_protocol.pb.dart';
 
 /// Connection between a worker and input / output.
-abstract class AsyncWorkerConnection {
+abstract class AsyncWorkerConnection implements WorkerConnection {
   /// Read a new [WorkRequest]. Returns [null] when there are no more requests.
   Future<WorkRequest> readRequest();
 
@@ -22,7 +24,7 @@
 /// Persistent Bazel worker loop.
 ///
 /// Extend this class and implement the `performRequest` method.
-abstract class AsyncWorkerLoop {
+abstract class AsyncWorkerLoop implements WorkerLoop {
   final AsyncWorkerConnection connection;
 
   AsyncWorkerLoop({AsyncWorkerConnection connection})
diff --git a/lib/src/message_grouper.dart b/lib/src/message_grouper.dart
new file mode 100644
index 0000000..635983e
--- /dev/null
+++ b/lib/src/message_grouper.dart
@@ -0,0 +1,13 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// Interface for a [MessageGrouper], which groups bytes in  delimited proto
+/// format into the bytes for each message.
+///
+/// This interface should not generally be implemented directly, instead use
+/// the [SyncMessageGrouper] or [AsyncMessageGrouper] implementations.
+abstract class MessageGrouper {
+  /// Returns either a [List<int>] or a [Future<List<int>>].
+  dynamic get next;
+}
diff --git a/lib/src/sync_message_grouper.dart b/lib/src/sync_message_grouper.dart
index a83e4a5..50d5dca 100644
--- a/lib/src/sync_message_grouper.dart
+++ b/lib/src/sync_message_grouper.dart
@@ -4,10 +4,11 @@
 
 import 'dart:io';
 
+import 'message_grouper.dart';
 import 'message_grouper_state.dart';
 
 /// Groups bytes in delimited proto format into the bytes for each message.
-class SyncMessageGrouper {
+class SyncMessageGrouper implements MessageGrouper {
   final _state = new MessageGrouperState();
   final Stdin _stdin;
 
diff --git a/lib/src/sync_worker_loop.dart b/lib/src/sync_worker_loop.dart
index 5638c95..3b2a05b 100644
--- a/lib/src/sync_worker_loop.dart
+++ b/lib/src/sync_worker_loop.dart
@@ -7,10 +7,12 @@
 import 'constants.dart';
 import 'sync_message_grouper.dart';
 import 'utils.dart';
+import 'worker_connection.dart';
+import 'worker_loop.dart';
 import 'worker_protocol.pb.dart';
 
 /// Connection between a worker and input / output.
-abstract class SyncWorkerConnection {
+abstract class SyncWorkerConnection implements WorkerConnection {
   /// Read a new [WorkRequest]. Returns [null] when there are no more requests.
   WorkRequest readRequest();
 
@@ -21,7 +23,7 @@
 /// Persistent Bazel worker loop.
 ///
 /// Extend this class and implement the `performRequest` method.
-abstract class SyncWorkerLoop {
+abstract class SyncWorkerLoop implements WorkerLoop {
   final SyncWorkerConnection connection;
 
   SyncWorkerLoop({SyncWorkerConnection connection})
diff --git a/lib/src/worker_connection.dart b/lib/src/worker_connection.dart
new file mode 100644
index 0000000..c39bc3f
--- /dev/null
+++ b/lib/src/worker_connection.dart
@@ -0,0 +1,17 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'worker_protocol.pb.dart';
+
+/// Interface for a [WorkerConnection].
+///
+/// Use [SyncWorkerConnection] or [AsyncWorkerConnection] implementations.
+abstract class WorkerConnection {
+  /// Read a [WorkRequest]. Returns either [Future<WorkRequest>] or
+  /// [WorkRequest].
+  dynamic readRequest();
+
+  /// Writes a [WorkResponse].
+  void writeResponse(WorkResponse response);
+}
diff --git a/lib/src/worker_loop.dart b/lib/src/worker_loop.dart
new file mode 100644
index 0000000..8eb7008
--- /dev/null
+++ b/lib/src/worker_loop.dart
@@ -0,0 +1,18 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'worker_protocol.pb.dart';
+
+/// Interface for a [WorkerLoop].
+///
+/// This interface should not generally be implemented directly, instead use
+/// the [SyncWorkerLoop] or [AsyncWorkerLoop] implementations.
+abstract class WorkerLoop {
+  /// Perform a single [WorkRequest], and return either a [WorkResponse] or
+  /// a [Future<WorkResponse>].
+  dynamic performRequest(WorkRequest request);
+
+  /// Run the worker loop. Should return either a [Future] or [null].
+  dynamic run();
+}
diff --git a/lib/testing.dart b/lib/testing.dart
index af65ede..eed01ba 100644
--- a/lib/testing.dart
+++ b/lib/testing.dart
@@ -10,33 +10,54 @@
 
 export 'src/utils.dart' show protoToDelimitedBuffer;
 
-/// A [Stdin] mock object.
-///
-/// Note: When using this with an [AsyncWorkerLoop] you must call [close] in
-/// order for the loop to exit properly.
-class TestStdinStream implements Stdin {
+/// Interface for a mock [Stdin] object that allows you to add bytes manually.
+abstract class TestStdin implements Stdin {
+  void addInputBytes(List<int> bytes);
+
+  void close();
+}
+
+/// A [Stdin] mock object which only implements `readByteSync`.
+class TestStdinSync implements TestStdin {
   /// Pending bytes to be delivered synchronously.
   final Queue<int> pendingBytes = new Queue<int>();
 
+  /// Adds all the [bytes] to this stream.
+  void addInputBytes(List<int> bytes) {
+    pendingBytes.addAll(bytes);
+  }
+
+  /// Add a -1 to signal EOF.
+  void close() {
+    pendingBytes.add(-1);
+  }
+
+  @override
+  int readByteSync() {
+    return pendingBytes.removeFirst();
+  }
+
+  @override
+  void noSuchMethod(Invocation invocation) {
+    throw new StateError('Unexpected invocation ${invocation.memberName}.');
+  }
+}
+
+/// A mock [Stdin] object which only implements `listen`.
+///
+/// Note: You must call [close] in order for the loop to exit properly.
+class TestStdinAsync implements TestStdin {
   /// Controls the stream for async delivery of bytes.
   final StreamController _controller = new StreamController();
 
   /// Adds all the [bytes] to this stream.
   void addInputBytes(List<int> bytes) {
-    pendingBytes.addAll(bytes);
     _controller.add(bytes);
   }
 
   /// Closes this stream. This is necessary for the [AsyncWorkerLoop] to exit.
-  Future close() => _controller.close();
-
-  @override
-  int readByteSync() {
-    if (pendingBytes.isEmpty) {
-      return -1;
-    } else {
-      return pendingBytes.removeFirst();
-    }
+  void close() {
+    _controller.close();
   }
 
   @override
@@ -69,8 +90,19 @@
   }
 }
 
+/// Interface for a [TestWorkerConnection] which records its responses
+abstract class TestWorkerConnection implements WorkerConnection {
+  List<WorkResponse> get responses;
+}
+
+/// Interface for a [TestWorkerLoop] which allows you to enqueue responses.
+abstract class TestWorkerLoop implements WorkerLoop {
+  void enqueueResponse(WorkResponse response);
+}
+
 /// A [StdSyncWorkerConnection] which records its responses.
-class TestSyncWorkerConnection extends StdSyncWorkerConnection {
+class TestSyncWorkerConnection extends StdSyncWorkerConnection
+    implements TestWorkerConnection {
   final List<WorkResponse> responses = <WorkResponse>[];
 
   TestSyncWorkerConnection(Stdin stdinStream, Stdout stdoutStream)
@@ -84,7 +116,7 @@
 }
 
 /// A [SyncWorkerLoop] for testing.
-class TestSyncWorkerLoop extends SyncWorkerLoop {
+class TestSyncWorkerLoop extends SyncWorkerLoop implements TestWorkerLoop {
   final List<WorkRequest> requests = <WorkRequest>[];
   final Queue<WorkResponse> _responses = new Queue<WorkResponse>();
 
@@ -106,7 +138,8 @@
 }
 
 /// A [StdAsyncWorkerConnection] which records its responses.
-class TestAsyncWorkerConnection extends StdAsyncWorkerConnection {
+class TestAsyncWorkerConnection extends StdAsyncWorkerConnection
+    implements TestWorkerConnection {
   final List<WorkResponse> responses = <WorkResponse>[];
 
   TestAsyncWorkerConnection(
@@ -121,7 +154,7 @@
 }
 
 /// A [AsyncWorkerLoop] for testing.
-class TestAsyncWorkerLoop extends AsyncWorkerLoop {
+class TestAsyncWorkerLoop extends AsyncWorkerLoop implements TestWorkerLoop {
   final List<WorkRequest> requests = <WorkRequest>[];
   final Queue<WorkResponse> _responses = new Queue<WorkResponse>();
 
diff --git a/test/message_grouper_test.dart b/test/message_grouper_test.dart
index 54e08b5..8a975fb 100644
--- a/test/message_grouper_test.dart
+++ b/test/message_grouper_test.dart
@@ -9,26 +9,29 @@
 
 import 'package:bazel_worker/src/async_message_grouper.dart';
 import 'package:bazel_worker/src/sync_message_grouper.dart';
+import 'package:bazel_worker/bazel_worker.dart';
 import 'package:bazel_worker/testing.dart';
 
 void main() {
   group('AsyncMessageGrouper', () {
-    runTests((Stdin stdinStream) => new AsyncMessageGrouper(stdinStream));
+    runTests(() => new TestStdinAsync(),
+        (Stdin stdinStream) => new AsyncMessageGrouper(stdinStream));
   });
 
   group('SyncMessageGrouper', () {
-    runTests((Stdin stdinStream) => new SyncMessageGrouper(stdinStream));
+    runTests(() => new TestStdinSync(),
+        (Stdin stdinStream) => new SyncMessageGrouper(stdinStream));
   });
 }
 
-void runTests(messageGrouperFactory(Stdin stdinStream)) {
-  // AsyncMessageGrouper or SyncMessageGrouper
-  var messageGrouper;
+void runTests(TestStdin stdinFactory(),
+    MessageGrouper messageGrouperFactory(Stdin stdinStream)) {
+  MessageGrouper messageGrouper;
 
-  TestStdinStream stdinStream;
+  TestStdin stdinStream;
 
   setUp(() {
-    stdinStream = new TestStdinStream();
+    stdinStream = stdinFactory();
     messageGrouper = messageGrouperFactory(stdinStream);
   });
 
diff --git a/test/all_test.dart b/test/test_all.dart
similarity index 100%
rename from test/all_test.dart
rename to test/test_all.dart
diff --git a/test/worker_loop_test.dart b/test/worker_loop_test.dart
index d9265ce..1c27d5d 100644
--- a/test/worker_loop_test.dart
+++ b/test/worker_loop_test.dart
@@ -2,7 +2,6 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
-import 'dart:async';
 import 'dart:io';
 
 import 'package:test/test.dart';
@@ -13,6 +12,7 @@
 void main() {
   group('SyncWorkerLoop', () {
     runTests(
+        () => new TestStdinSync(),
         (Stdin stdinStream, Stdout stdoutStream) =>
             new TestSyncWorkerConnection(stdinStream, stdoutStream),
         (TestSyncWorkerConnection connection) =>
@@ -21,24 +21,25 @@
 
   group('AsyncWorkerLoop', () {
     runTests(
-        (Stream<List<int>> stdinStream, StreamSink<List<int>> stdoutStream) =>
+        () => new TestStdinAsync(),
+        (Stdin stdinStream, Stdout stdoutStream) =>
             new TestAsyncWorkerConnection(stdinStream, stdoutStream),
         (TestAsyncWorkerConnection connection) =>
             new TestAsyncWorkerLoop(connection));
   });
 }
 
-void runTests(
-    workerConnectionFactory(stdin, stdout), workerLoopFactory(connection)) {
-  TestStdinStream stdinStream;
+void runTests/*<T extends TestWorkerConnection>*/(
+    TestStdin stdinFactory(),
+    /*=T*/ workerConnectionFactory(Stdin stdin, Stdout stdout),
+    TestWorkerLoop workerLoopFactory(/*=T*/ connection)) {
+  TestStdin stdinStream;
   TestStdoutStream stdoutStream;
-  // TestSyncWorkerConnection or TestAsyncWorkerConnection
-  var connection;
-  // TestSyncWorkerLoop or TestAsyncWorkerLoop
-  var workerLoop;
+  var /*=T*/ connection;
+  TestWorkerLoop workerLoop;
 
   setUp(() {
-    stdinStream = new TestStdinStream();
+    stdinStream = stdinFactory();
     stdoutStream = new TestStdoutStream();
     connection = workerConnectionFactory(stdinStream, stdoutStream);
     workerLoop = workerLoopFactory(connection);
diff --git a/tool/travis.sh b/tool/travis.sh
index fdf2b4a..6b5e340 100644
--- a/tool/travis.sh
+++ b/tool/travis.sh
@@ -10,7 +10,7 @@
 # Verify that the libraries are error free.
 dartanalyzer --fatal-warnings \
   lib/bazel_worker.dart \
-  test/all_test.dart
+  test/test_all.dart
 
 # Run the tests.
 pub run test