Merge pull request #29 from davidmorgan/support-isolates
Support running workers in isolates.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ae4cb6a..d2279aa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 0.1.14
+
+* Allow workers to support running in isolates. To support running in isolates,
+ workers must modify their `main` method to accept a `SendPort` then use it
+ when creating the `AsyncWorkerConnection`. See `async_worker` in `e2e_test`.
+
## 0.1.13
* Support protobuf 0.10.0.
diff --git a/e2e_test/bin/async_worker.dart b/e2e_test/bin/async_worker.dart
index e2d0536..4573473 100644
--- a/e2e_test/bin/async_worker.dart
+++ b/e2e_test/bin/async_worker.dart
@@ -3,9 +3,12 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
+import 'dart:isolate';
import 'package:e2e_test/async_worker.dart';
-Future main() async {
- await new ExampleAsyncWorker().run();
+/// This worker can run in one of two ways: normally, using stdin/stdout, or
+/// in an isolate, communicating over a [SendPort].
+Future main(List<String> args, [SendPort sendPort]) async {
+ await new ExampleAsyncWorker(sendPort).run();
}
diff --git a/e2e_test/bin/async_worker_in_isolate.dart b/e2e_test/bin/async_worker_in_isolate.dart
new file mode 100644
index 0000000..7b245f6
--- /dev/null
+++ b/e2e_test/bin/async_worker_in_isolate.dart
@@ -0,0 +1,24 @@
+// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:e2e_test/forwards_to_isolate_async_worker.dart';
+
+/// Wraps the worker provided by `async_worker.dart`, launching it in an
+/// isolate. Requests are forwarded to the isolate and responses are returned
+/// directly from the isolate.
+///
+/// Anyone actually using the facility to wrap a worker in an isolate will want
+/// to use this code to do additional work, for example post processing one of
+/// the output files.
+Future main(List<String> args, SendPort message) async {
+ var receivePort = new ReceivePort();
+ await Isolate.spawnUri(
+ new Uri.file('async_worker.dart'), [], receivePort.sendPort);
+
+ var worker = await ForwardsToIsolateAsyncWorker.create(receivePort);
+ await worker.run();
+}
diff --git a/e2e_test/lib/async_worker.dart b/e2e_test/lib/async_worker.dart
index b8239d3..ee080a9 100644
--- a/e2e_test/lib/async_worker.dart
+++ b/e2e_test/lib/async_worker.dart
@@ -3,12 +3,17 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
+import 'dart:isolate';
import 'package:bazel_worker/bazel_worker.dart';
/// Example worker that just returns in its response all the arguments passed
/// separated by newlines.
class ExampleAsyncWorker extends AsyncWorkerLoop {
+ /// Set [sendPort] to run in an isolate.
+ ExampleAsyncWorker([SendPort sendPort])
+ : super(connection: new AsyncWorkerConnection(sendPort: sendPort));
+
Future<WorkResponse> performRequest(WorkRequest request) async {
return new WorkResponse()
..exitCode = 0
diff --git a/e2e_test/lib/forwards_to_isolate_async_worker.dart b/e2e_test/lib/forwards_to_isolate_async_worker.dart
new file mode 100644
index 0000000..47cf101
--- /dev/null
+++ b/e2e_test/lib/forwards_to_isolate_async_worker.dart
@@ -0,0 +1,27 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/driver.dart';
+
+/// Example worker that just forwards requests to an isolate.
+class ForwardsToIsolateAsyncWorker extends AsyncWorkerLoop {
+ final IsolateDriverConnection _isolateDriverConnection;
+
+ static Future<ForwardsToIsolateAsyncWorker> create(
+ ReceivePort receivePort) async {
+ return new ForwardsToIsolateAsyncWorker(
+ await IsolateDriverConnection.create(receivePort));
+ }
+
+ ForwardsToIsolateAsyncWorker(this._isolateDriverConnection);
+
+ Future<WorkResponse> performRequest(WorkRequest request) {
+ _isolateDriverConnection.writeRequest(request);
+ return _isolateDriverConnection.readResponse();
+ }
+}
diff --git a/e2e_test/test/e2e_test.dart b/e2e_test/test/e2e_test.dart
index d63419b..1757139 100644
--- a/e2e_test/test/e2e_test.dart
+++ b/e2e_test/test/e2e_test.dart
@@ -17,6 +17,10 @@
() => Process.start(dart, [p.join('bin', 'sync_worker.dart')]));
runE2eTestForWorker('async worker',
() => Process.start(dart, [p.join('bin', 'async_worker.dart')]));
+ runE2eTestForWorker(
+ 'async worker in isolate',
+ () =>
+ Process.start(dart, [p.join('bin', 'async_worker_in_isolate.dart')]));
}
void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 6c26901..6dd9826 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -5,6 +5,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
+import 'dart:isolate';
import '../async_message_grouper.dart';
import '../worker_protocol.pb.dart';
@@ -78,3 +79,35 @@
@override
Future cancel() => _messageGrouper.cancel();
}
+
+/// [DriverConnection] that works with an isolate via a [SendPort].
+class IsolateDriverConnection implements DriverConnection {
+ final StreamIterator _receivePortIterator;
+ final SendPort _sendPort;
+
+ IsolateDriverConnection._(this._receivePortIterator, this._sendPort);
+
+ /// Creates a driver connection for a worker in an isolate. Provide the
+ /// [receivePort] attached to the [SendPort] that the isolate was created
+ /// with.
+ static Future<IsolateDriverConnection> create(ReceivePort receivePort) async {
+ var receivePortIterator = new StreamIterator(receivePort);
+ await receivePortIterator.moveNext();
+ var sendPort = receivePortIterator.current as SendPort;
+ return new IsolateDriverConnection._(receivePortIterator, sendPort);
+ }
+
+ @override
+ Future<WorkResponse> readResponse() async {
+ await _receivePortIterator.moveNext();
+ return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
+ }
+
+ @override
+ void writeRequest(WorkRequest request) {
+ _sendPort.send(request.writeToBuffer());
+ }
+
+ @override
+ Future cancel() async {}
+}
diff --git a/lib/src/worker/worker_connection.dart b/lib/src/worker/worker_connection.dart
index 7fdd4e7..c55f52d 100644
--- a/lib/src/worker/worker_connection.dart
+++ b/lib/src/worker/worker_connection.dart
@@ -4,6 +4,8 @@
import 'dart:async';
import 'dart:io';
+import 'dart:isolate';
+import 'dart:typed_data';
import '../async_message_grouper.dart';
import '../sync_message_grouper.dart';
@@ -24,6 +26,18 @@
}
abstract class AsyncWorkerConnection implements WorkerConnection {
+ /// Creates a [StdAsyncWorkerConnection] with the specified [inputStream]
+ /// and [outputStream], unless [sendPort] is specified, in which case
+ /// creates a [SendPortAsyncWorkerConnection].
+ factory AsyncWorkerConnection(
+ {Stream<List<int>> inputStream,
+ StreamSink<List<int>> outputStream,
+ SendPort sendPort}) =>
+ sendPort == null
+ ? new StdAsyncWorkerConnection(
+ inputStream: inputStream, outputStream: outputStream)
+ : new SendPortAsyncWorkerConnection(sendPort);
+
@override
Future<WorkRequest> readRequest();
}
@@ -57,6 +71,33 @@
}
}
+/// Implementation of [AsyncWorkerConnection] for running in an isolate.
+class SendPortAsyncWorkerConnection implements AsyncWorkerConnection {
+ final ReceivePort receivePort;
+ final StreamIterator<Uint8List> receivePortIterator;
+ final SendPort sendPort;
+
+ factory SendPortAsyncWorkerConnection(SendPort sendPort) {
+ var receivePort = new ReceivePort();
+ sendPort.send(receivePort.sendPort);
+ return SendPortAsyncWorkerConnection._(receivePort, sendPort);
+ }
+
+ SendPortAsyncWorkerConnection._(this.receivePort, this.sendPort)
+ : receivePortIterator = new StreamIterator(receivePort.cast());
+
+ @override
+ Future<WorkRequest> readRequest() async {
+ if (!await receivePortIterator.moveNext()) return null;
+ return new WorkRequest.fromBuffer(receivePortIterator.current);
+ }
+
+ @override
+ void writeResponse(WorkResponse response) {
+ sendPort.send(response.writeToBuffer());
+ }
+}
+
/// Default implementation of [SyncWorkerConnection] that works with [Stdin] and
/// [Stdout].
class StdSyncWorkerConnection implements SyncWorkerConnection {
diff --git a/pubspec.yaml b/pubspec.yaml
index c6c931a..d4f9cb1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.13
+version: 0.1.14
description: Tools for creating a bazel persistent worker.
author: Dart Team <misc@dartlang.org>