Merge pull request #29 from davidmorgan/support-isolates

Support running workers in isolates.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ae4cb6a..d2279aa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 0.1.14
+
+* Allow workers to support running in isolates. To support running in isolates,
+  workers must modify their `main` method to accept a `SendPort` then use it
+  when creating the `AsyncWorkerConnection`. See `async_worker` in `e2e_test`.
+
 ## 0.1.13
 
 * Support protobuf 0.10.0.
diff --git a/e2e_test/bin/async_worker.dart b/e2e_test/bin/async_worker.dart
index e2d0536..4573473 100644
--- a/e2e_test/bin/async_worker.dart
+++ b/e2e_test/bin/async_worker.dart
@@ -3,9 +3,12 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:async';
+import 'dart:isolate';
 
 import 'package:e2e_test/async_worker.dart';
 
-Future main() async {
-  await new ExampleAsyncWorker().run();
+/// This worker can run in one of two ways: normally, using stdin/stdout, or
+/// in an isolate, communicating over a [SendPort].
+Future main(List<String> args, [SendPort sendPort]) async {
+  await new ExampleAsyncWorker(sendPort).run();
 }
diff --git a/e2e_test/bin/async_worker_in_isolate.dart b/e2e_test/bin/async_worker_in_isolate.dart
new file mode 100644
index 0000000..7b245f6
--- /dev/null
+++ b/e2e_test/bin/async_worker_in_isolate.dart
@@ -0,0 +1,24 @@
+// Copyright (c) 2018, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:e2e_test/forwards_to_isolate_async_worker.dart';
+
+/// Wraps the worker provided by `async_worker.dart`, launching it in an
+/// isolate. Requests are forwarded to the isolate and responses are returned
+/// directly from the isolate.
+///
+/// Anyone actually using the facility to wrap a worker in an isolate will want
+/// to use this code to do additional work, for example post processing one of
+/// the output files.
+Future main(List<String> args, SendPort message) async {
+  var receivePort = new ReceivePort();
+  await Isolate.spawnUri(
+      new Uri.file('async_worker.dart'), [], receivePort.sendPort);
+
+  var worker = await ForwardsToIsolateAsyncWorker.create(receivePort);
+  await worker.run();
+}
diff --git a/e2e_test/lib/async_worker.dart b/e2e_test/lib/async_worker.dart
index b8239d3..ee080a9 100644
--- a/e2e_test/lib/async_worker.dart
+++ b/e2e_test/lib/async_worker.dart
@@ -3,12 +3,17 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:async';
+import 'dart:isolate';
 
 import 'package:bazel_worker/bazel_worker.dart';
 
 /// Example worker that just returns in its response all the arguments passed
 /// separated by newlines.
 class ExampleAsyncWorker extends AsyncWorkerLoop {
+  /// Set [sendPort] to run in an isolate.
+  ExampleAsyncWorker([SendPort sendPort])
+      : super(connection: new AsyncWorkerConnection(sendPort: sendPort));
+
   Future<WorkResponse> performRequest(WorkRequest request) async {
     return new WorkResponse()
       ..exitCode = 0
diff --git a/e2e_test/lib/forwards_to_isolate_async_worker.dart b/e2e_test/lib/forwards_to_isolate_async_worker.dart
new file mode 100644
index 0000000..47cf101
--- /dev/null
+++ b/e2e_test/lib/forwards_to_isolate_async_worker.dart
@@ -0,0 +1,27 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/driver.dart';
+
+/// Example worker that just forwards requests to an isolate.
+class ForwardsToIsolateAsyncWorker extends AsyncWorkerLoop {
+  final IsolateDriverConnection _isolateDriverConnection;
+
+  static Future<ForwardsToIsolateAsyncWorker> create(
+      ReceivePort receivePort) async {
+    return new ForwardsToIsolateAsyncWorker(
+        await IsolateDriverConnection.create(receivePort));
+  }
+
+  ForwardsToIsolateAsyncWorker(this._isolateDriverConnection);
+
+  Future<WorkResponse> performRequest(WorkRequest request) {
+    _isolateDriverConnection.writeRequest(request);
+    return _isolateDriverConnection.readResponse();
+  }
+}
diff --git a/e2e_test/test/e2e_test.dart b/e2e_test/test/e2e_test.dart
index d63419b..1757139 100644
--- a/e2e_test/test/e2e_test.dart
+++ b/e2e_test/test/e2e_test.dart
@@ -17,6 +17,10 @@
       () => Process.start(dart, [p.join('bin', 'sync_worker.dart')]));
   runE2eTestForWorker('async worker',
       () => Process.start(dart, [p.join('bin', 'async_worker.dart')]));
+  runE2eTestForWorker(
+      'async worker in isolate',
+      () =>
+          Process.start(dart, [p.join('bin', 'async_worker_in_isolate.dart')]));
 }
 
 void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 6c26901..6dd9826 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 import 'dart:convert';
 import 'dart:io';
+import 'dart:isolate';
 
 import '../async_message_grouper.dart';
 import '../worker_protocol.pb.dart';
@@ -78,3 +79,35 @@
   @override
   Future cancel() => _messageGrouper.cancel();
 }
+
+/// [DriverConnection] that works with an isolate via a [SendPort].
+class IsolateDriverConnection implements DriverConnection {
+  final StreamIterator _receivePortIterator;
+  final SendPort _sendPort;
+
+  IsolateDriverConnection._(this._receivePortIterator, this._sendPort);
+
+  /// Creates a driver connection for a worker in an isolate. Provide the
+  /// [receivePort] attached to the [SendPort] that the isolate was created
+  /// with.
+  static Future<IsolateDriverConnection> create(ReceivePort receivePort) async {
+    var receivePortIterator = new StreamIterator(receivePort);
+    await receivePortIterator.moveNext();
+    var sendPort = receivePortIterator.current as SendPort;
+    return new IsolateDriverConnection._(receivePortIterator, sendPort);
+  }
+
+  @override
+  Future<WorkResponse> readResponse() async {
+    await _receivePortIterator.moveNext();
+    return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
+  }
+
+  @override
+  void writeRequest(WorkRequest request) {
+    _sendPort.send(request.writeToBuffer());
+  }
+
+  @override
+  Future cancel() async {}
+}
diff --git a/lib/src/worker/worker_connection.dart b/lib/src/worker/worker_connection.dart
index 7fdd4e7..c55f52d 100644
--- a/lib/src/worker/worker_connection.dart
+++ b/lib/src/worker/worker_connection.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 import 'dart:io';
+import 'dart:isolate';
+import 'dart:typed_data';
 
 import '../async_message_grouper.dart';
 import '../sync_message_grouper.dart';
@@ -24,6 +26,18 @@
 }
 
 abstract class AsyncWorkerConnection implements WorkerConnection {
+  /// Creates a [StdAsyncWorkerConnection] with the specified [inputStream]
+  /// and [outputStream], unless [sendPort] is specified, in which case
+  /// creates a [SendPortAsyncWorkerConnection].
+  factory AsyncWorkerConnection(
+          {Stream<List<int>> inputStream,
+          StreamSink<List<int>> outputStream,
+          SendPort sendPort}) =>
+      sendPort == null
+          ? new StdAsyncWorkerConnection(
+              inputStream: inputStream, outputStream: outputStream)
+          : new SendPortAsyncWorkerConnection(sendPort);
+
   @override
   Future<WorkRequest> readRequest();
 }
@@ -57,6 +71,33 @@
   }
 }
 
+/// Implementation of [AsyncWorkerConnection] for running in an isolate.
+class SendPortAsyncWorkerConnection implements AsyncWorkerConnection {
+  final ReceivePort receivePort;
+  final StreamIterator<Uint8List> receivePortIterator;
+  final SendPort sendPort;
+
+  factory SendPortAsyncWorkerConnection(SendPort sendPort) {
+    var receivePort = new ReceivePort();
+    sendPort.send(receivePort.sendPort);
+    return SendPortAsyncWorkerConnection._(receivePort, sendPort);
+  }
+
+  SendPortAsyncWorkerConnection._(this.receivePort, this.sendPort)
+      : receivePortIterator = new StreamIterator(receivePort.cast());
+
+  @override
+  Future<WorkRequest> readRequest() async {
+    if (!await receivePortIterator.moveNext()) return null;
+    return new WorkRequest.fromBuffer(receivePortIterator.current);
+  }
+
+  @override
+  void writeResponse(WorkResponse response) {
+    sendPort.send(response.writeToBuffer());
+  }
+}
+
 /// Default implementation of [SyncWorkerConnection] that works with [Stdin] and
 /// [Stdout].
 class StdSyncWorkerConnection implements SyncWorkerConnection {
diff --git a/pubspec.yaml b/pubspec.yaml
index c6c931a..d4f9cb1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: bazel_worker
-version: 0.1.13
+version: 0.1.14
 
 description: Tools for creating a bazel persistent worker.
 author: Dart Team <misc@dartlang.org>