Changes for code review.
diff --git a/e2e_test/lib/async_worker.dart b/e2e_test/lib/async_worker.dart
index b42944e..ee080a9 100644
--- a/e2e_test/lib/async_worker.dart
+++ b/e2e_test/lib/async_worker.dart
@@ -12,7 +12,7 @@
class ExampleAsyncWorker extends AsyncWorkerLoop {
/// Set [sendPort] to run in an isolate.
ExampleAsyncWorker([SendPort sendPort])
- : super(connection: new AsyncWorkerConnection(sendPort));
+ : super(connection: new AsyncWorkerConnection(sendPort: sendPort));
Future<WorkResponse> performRequest(WorkRequest request) async {
return new WorkResponse()
diff --git a/e2e_test/lib/forwards_to_isolate_async_worker.dart b/e2e_test/lib/forwards_to_isolate_async_worker.dart
index b94898a..47cf101 100644
--- a/e2e_test/lib/forwards_to_isolate_async_worker.dart
+++ b/e2e_test/lib/forwards_to_isolate_async_worker.dart
@@ -6,28 +6,22 @@
import 'dart:isolate';
import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/driver.dart';
/// Example worker that just forwards requests to an isolate.
class ForwardsToIsolateAsyncWorker extends AsyncWorkerLoop {
- final StreamIterator _receivePortIterator;
- final SendPort _sendPort;
+ final IsolateDriverConnection _isolateDriverConnection;
static Future<ForwardsToIsolateAsyncWorker> create(
ReceivePort receivePort) async {
- // The first thing the isolate sends is a `SendPort` so we can communicate
- // with it.
- var receivePortIterator = new StreamIterator(receivePort);
- await receivePortIterator.moveNext();
- var sendPort = receivePortIterator.current as SendPort;
- return new ForwardsToIsolateAsyncWorker(receivePortIterator, sendPort);
+ return new ForwardsToIsolateAsyncWorker(
+ await IsolateDriverConnection.create(receivePort));
}
- ForwardsToIsolateAsyncWorker(this._receivePortIterator, this._sendPort);
+ ForwardsToIsolateAsyncWorker(this._isolateDriverConnection);
- Future<WorkResponse> performRequest(WorkRequest request) async {
- // Send the request to the isolate, return the response from the isolate.
- _sendPort.send(request.writeToBuffer());
- await _receivePortIterator.moveNext();
- return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
+ Future<WorkResponse> performRequest(WorkRequest request) {
+ _isolateDriverConnection.writeRequest(request);
+ return _isolateDriverConnection.readResponse();
}
}
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 6c26901..6dd9826 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -5,6 +5,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
+import 'dart:isolate';
import '../async_message_grouper.dart';
import '../worker_protocol.pb.dart';
@@ -78,3 +79,35 @@
@override
Future cancel() => _messageGrouper.cancel();
}
+
+/// [DriverConnection] that works with an isolate via a [SendPort].
+class IsolateDriverConnection implements DriverConnection {
+ final StreamIterator _receivePortIterator;
+ final SendPort _sendPort;
+
+ IsolateDriverConnection._(this._receivePortIterator, this._sendPort);
+
+ /// Creates a driver connection for a worker in an isolate. Provide the
+ /// [receivePort] attached to the [SendPort] that the isolate was created
+ /// with.
+ static Future<IsolateDriverConnection> create(ReceivePort receivePort) async {
+ var receivePortIterator = new StreamIterator(receivePort);
+ await receivePortIterator.moveNext();
+ var sendPort = receivePortIterator.current as SendPort;
+ return new IsolateDriverConnection._(receivePortIterator, sendPort);
+ }
+
+ @override
+ Future<WorkResponse> readResponse() async {
+ await _receivePortIterator.moveNext();
+ return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
+ }
+
+ @override
+ void writeRequest(WorkRequest request) {
+ _sendPort.send(request.writeToBuffer());
+ }
+
+ @override
+ Future cancel() async {}
+}
diff --git a/lib/src/worker/worker_connection.dart b/lib/src/worker/worker_connection.dart
index ddd8880..c55f52d 100644
--- a/lib/src/worker/worker_connection.dart
+++ b/lib/src/worker/worker_connection.dart
@@ -26,11 +26,17 @@
}
abstract class AsyncWorkerConnection implements WorkerConnection {
- /// Creates a [StdAsyncWorkerConnection], unless [sendPort] is specified, in
- /// which case creates a [SendPortAsyncWorkerConnection].
- factory AsyncWorkerConnection([SendPort sendPort]) => sendPort == null
- ? new StdAsyncWorkerConnection()
- : new SendPortAsyncWorkerConnection(sendPort);
+ /// Creates a [StdAsyncWorkerConnection] with the specified [inputStream]
+ /// and [outputStream], unless [sendPort] is specified, in which case
+ /// creates a [SendPortAsyncWorkerConnection].
+ factory AsyncWorkerConnection(
+ {Stream<List<int>> inputStream,
+ StreamSink<List<int>> outputStream,
+ SendPort sendPort}) =>
+ sendPort == null
+ ? new StdAsyncWorkerConnection(
+ inputStream: inputStream, outputStream: outputStream)
+ : new SendPortAsyncWorkerConnection(sendPort);
@override
Future<WorkRequest> readRequest();
@@ -68,7 +74,7 @@
/// Implementation of [AsyncWorkerConnection] for running in an isolate.
class SendPortAsyncWorkerConnection implements AsyncWorkerConnection {
final ReceivePort receivePort;
- final StreamIterator receivePortIterator;
+ final StreamIterator<Uint8List> receivePortIterator;
final SendPort sendPort;
factory SendPortAsyncWorkerConnection(SendPort sendPort) {
@@ -78,14 +84,12 @@
}
SendPortAsyncWorkerConnection._(this.receivePort, this.sendPort)
- : receivePortIterator = new StreamIterator(receivePort);
+ : receivePortIterator = new StreamIterator(receivePort.cast());
@override
Future<WorkRequest> readRequest() async {
- if (!await receivePortIterator.moveNext()) {
- throw new StateError('Receive port closed.');
- }
- return new WorkRequest.fromBuffer(receivePortIterator.current as Uint8List);
+ if (!await receivePortIterator.moveNext()) return null;
+ return new WorkRequest.fromBuffer(receivePortIterator.current);
}
@override
diff --git a/pubspec.yaml b/pubspec.yaml
index f235b89..d4f9cb1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.14-dev
+version: 0.1.14
description: Tools for creating a bazel persistent worker.
author: Dart Team <misc@dartlang.org>