[vm/isolate] Add TransferableTypedData class that allows low-cost passing of Uint8List between isolates.
TransferableTypedData instances are one-use kind of thing: once receiver materializes it, it can't be used
again, once sender sends it out to an isolate, sender can't send it to different isolate.
Example of use:
sender isolate:
```
Future<TransferableTypedData> consolidateHttpClientResponseBytes(HttpClientResponse response) {
final completer = Completer<TransferableTypedData>();
final chunks = <Uint8List>[];
response.listen((List<int> chunk) {
chunks.add(chunk);
}, onDone: () {
completer.complete(TransferableTypedData.fromList(chunks));
});
return completer.future;
}
...
sendPort.send(await consolidateHttpClientResponseBytes(response));
```
receiver isolate:
```
RawReceivePort port = RawReceivePort((TransferableTypedData transferable) {
Uint8List content = transferable.materialize().asUint8List();
...
});
```
31959[tr] and 31960[tr] tests were inspired by dartbug.com/31959, dartbug.com/31960 that this CL attempts to address:
```
╰─➤ out/ReleaseX64/dart 31960.dart
sending...
163ms for round-trip
sending...
81ms for round-trip
sending...
20ms for round-trip
sending...
14ms for round-trip
sending...
20ms for round-trip
sending...
14ms for round-trip
```
(notice no "since last checking" pauses") vs
```
╰─➤ out/ReleaseX64/dart 31960.dart
sending...
154ms since last checkin
174ms for round-trip
sending...
68ms since last checkin
9ms since last checkin
171ms for round-trip
sending...
13ms since last checkin
108ms for round-trip
sending...
14ms since last checkin
108ms for round-trip
sending...
14ms since last checkin
107ms for round-trip
```
Change-Id: I0fcb5ce285394f498c3f1db4414204531f98199d
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/99623
Commit-Queue: Alexander Aprelev <aam@google.com>
Reviewed-by: Ryan Macnak <rmacnak@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 54b7a88..1027c80 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,12 @@
## 2.3.2-dev.XX.0
+
+### Core library changes
+
+#### `dart:isolate`
+
+* `TransferableTypedData` class was added to facilitate faster cross-isolate
+communication of `Uint8List` data.
+
(Add new changes here, and they will be copied to the change section for the
next dev version)
diff --git a/runtime/lib/isolate.cc b/runtime/lib/isolate.cc
index c6280f1..84c2aed 100644
--- a/runtime/lib/isolate.cc
+++ b/runtime/lib/isolate.cc
@@ -434,4 +434,120 @@
return Object::null();
}
+static void ExternalTypedDataFinalizer(void* isolate_callback_data,
+ Dart_WeakPersistentHandle handle,
+ void* peer) {
+ free(peer);
+}
+
+static intptr_t GetUint8SizeOrThrow(const Instance& instance) {
+ // From the Dart side we are guaranteed that the type of [instance] is a
+ // subtype of TypedData.
+ if (instance.IsTypedDataBase()) {
+ return TypedDataBase::Cast(instance).LengthInBytes();
+ }
+
+ // This can happen if [instance] is `null` or an instance of a 3rd party class
+ // which implements [TypedData].
+ Exceptions::ThrowArgumentError(instance);
+}
+
+DEFINE_NATIVE_ENTRY(TransferableTypedData_factory, 0, 2) {
+ ASSERT(
+ TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
+
+ GET_NON_NULL_NATIVE_ARGUMENT(Instance, array_instance,
+ arguments->NativeArgAt(1));
+
+ Array& array = Array::Handle();
+ intptr_t array_length;
+ if (array_instance.IsGrowableObjectArray()) {
+ const auto& growable_array = GrowableObjectArray::Cast(array_instance);
+ array ^= growable_array.data();
+ array_length = growable_array.Length();
+ } else if (array_instance.IsArray()) {
+ array ^= Array::Cast(array_instance).raw();
+ array_length = array.Length();
+ } else {
+ Exceptions::ThrowArgumentError(array_instance);
+ UNREACHABLE();
+ }
+ Instance& instance = Instance::Handle();
+ unsigned long long total_bytes = 0;
+ const unsigned long kMaxBytes =
+ TypedData::MaxElements(kTypedDataUint8ArrayCid);
+ for (intptr_t i = 0; i < array_length; i++) {
+ instance ^= array.At(i);
+ total_bytes += GetUint8SizeOrThrow(instance);
+ if (total_bytes > kMaxBytes) {
+ const Array& error_args = Array::Handle(Array::New(3));
+ error_args.SetAt(0, array);
+ error_args.SetAt(1, String::Handle(String::New("data")));
+ error_args.SetAt(2,
+ String::Handle(String::NewFormatted(
+ "Aggregated list exceeds max size %ld", kMaxBytes)));
+ Exceptions::ThrowByType(Exceptions::kArgumentValue, error_args);
+ UNREACHABLE();
+ }
+ }
+
+ uint8_t* data = reinterpret_cast<uint8_t*>(malloc(total_bytes));
+ if (data == nullptr) {
+ const Instance& exception =
+ Instance::Handle(thread->isolate()->object_store()->out_of_memory());
+ Exceptions::Throw(thread, exception);
+ UNREACHABLE();
+ }
+ intptr_t offset = 0;
+ for (intptr_t i = 0; i < array_length; i++) {
+ instance ^= array.At(i);
+
+ {
+ NoSafepointScope no_safepoint;
+ const auto& typed_data = TypedDataBase::Cast(instance);
+ const intptr_t length_in_bytes = typed_data.LengthInBytes();
+
+ void* source = typed_data.DataAddr(0);
+ // The memory does not overlap.
+ memcpy(data + offset, source, length_in_bytes);
+ offset += length_in_bytes;
+ }
+ }
+ ASSERT(static_cast<unsigned long>(offset) == total_bytes);
+ return TransferableTypedData::New(data, total_bytes);
+}
+
+DEFINE_NATIVE_ENTRY(TransferableTypedData_materialize, 0, 1) {
+ GET_NON_NULL_NATIVE_ARGUMENT(TransferableTypedData, t,
+ arguments->NativeArgAt(0));
+
+ void* peer;
+ {
+ NoSafepointScope no_safepoint;
+ peer = thread->heap()->GetPeer(t.raw());
+ // Assume that object's Peer is only used to track transferrability state.
+ ASSERT(peer != nullptr);
+ }
+
+ TransferableTypedDataPeer* tpeer =
+ reinterpret_cast<TransferableTypedDataPeer*>(peer);
+ const intptr_t length = tpeer->length();
+ uint8_t* data = tpeer->data();
+ if (data == nullptr) {
+ const auto& error = String::Handle(String::New(
+ "Attempt to materialize object that was transferred already."));
+ Exceptions::ThrowArgumentError(error);
+ UNREACHABLE();
+ }
+ tpeer->ClearData();
+
+ const ExternalTypedData& typed_data = ExternalTypedData::Handle(
+ ExternalTypedData::New(kExternalTypedDataUint8ArrayCid, data, length,
+ thread->heap()->SpaceForExternal(length)));
+ FinalizablePersistentHandle::New(thread->isolate(), typed_data,
+ /* peer= */ data,
+ &ExternalTypedDataFinalizer, length);
+ return typed_data.raw();
+}
+
} // namespace dart
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart
index cdd5394..7c28e91 100644
--- a/runtime/lib/isolate_patch.dart
+++ b/runtime/lib/isolate_patch.dart
@@ -7,12 +7,13 @@
/// used by patches of that library. We plan to change this when we have a
/// shared front end and simply use parts.
-import "dart:_internal" show VMLibraryHooks, patch;
+import "dart:_internal" show ClassID, VMLibraryHooks, patch;
import "dart:async"
show Completer, Future, Stream, StreamController, StreamSubscription, Timer;
import "dart:collection" show HashMap;
+import "dart:typed_data" show ByteBuffer, TypedData, Uint8List;
/// These are the additional parts of this patch library:
// part "timer_impl.dart";
@@ -671,3 +672,33 @@
static String _getCurrentRootUriStr() native "Isolate_getCurrentRootUriStr";
}
+
+@patch
+abstract class TransferableTypedData {
+ @patch
+ factory TransferableTypedData.fromList(List<TypedData> chunks) {
+ if (chunks == null) {
+ throw ArgumentError(chunks);
+ }
+ final int cid = ClassID.getID(chunks);
+ if (cid != ClassID.cidArray &&
+ cid != ClassID.cidGrowableObjectArray &&
+ cid != ClassID.cidImmutableArray) {
+ chunks = List.unmodifiable(chunks);
+ }
+ return _TransferableTypedDataImpl(chunks);
+ }
+}
+
+@pragma("vm:entry-point")
+class _TransferableTypedDataImpl implements TransferableTypedData {
+ factory _TransferableTypedDataImpl(List<TypedData> list)
+ native "TransferableTypedData_factory";
+
+ ByteBuffer materialize() {
+ return _materializeIntoUint8List().buffer;
+ }
+
+ Uint8List _materializeIntoUint8List()
+ native "TransferableTypedData_materialize";
+}
diff --git a/runtime/tests/vm/dart/issue_31959_31960_test.dart b/runtime/tests/vm/dart/issue_31959_31960_test.dart
new file mode 100644
index 0000000..3990fb6
--- /dev/null
+++ b/runtime/tests/vm/dart/issue_31959_31960_test.dart
@@ -0,0 +1,90 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+import 'dart:math';
+import 'dart:typed_data';
+
+import 'package:async_helper/async_helper.dart' show asyncStart, asyncEnd;
+import 'package:expect/expect.dart';
+
+Uint8List generateSampleList(final size) {
+ final list = Uint8List(size);
+ for (int i = 0; i < size; i++) {
+ list[i] = i % 243;
+ }
+ return list;
+}
+
+void validateReceivedList(final expectedSize, final list) {
+ Expect.equals(expectedSize, list.length);
+ // probe few elements
+ for (int i = 0; i < list.length; i += max<num>(1, expectedSize ~/ 1000)) {
+ Expect.equals(i % 243, list[i]);
+ }
+}
+
+Future<Null> testSend(
+ bool transferable, int toIsolateSize, int fromIsolateSize) async {
+ asyncStart();
+ final port = ReceivePort();
+ final inbox = StreamIterator(port);
+ await Isolate.spawn(isolateMain,
+ [transferable, toIsolateSize, fromIsolateSize, port.sendPort]);
+ await inbox.moveNext();
+ final outbox = inbox.current;
+ final workWatch = Stopwatch();
+ final data = generateSampleList(toIsolateSize);
+ int count = 10;
+ workWatch.start();
+ while (count-- > 0) {
+ outbox.send(transferable ? TransferableTypedData.fromList([data]) : data);
+ await inbox.moveNext();
+ validateReceivedList(
+ fromIsolateSize,
+ transferable
+ ? inbox.current.materialize().asUint8List()
+ : inbox.current);
+ }
+ print('total ${workWatch.elapsedMilliseconds}ms');
+ outbox.send(null);
+ port.close();
+ asyncEnd();
+}
+
+main() async {
+ asyncStart();
+ int bignum = 100 * 1000 * 1000;
+ await testSend(false, bignum, 1); // none
+ await testSend(true, bignum, 1); // 31959tr
+ await testSend(false, bignum, 1); // 31960
+ await testSend(true, bignum, 1); // 31960tr
+ asyncEnd();
+}
+
+Future<Null> isolateMain(List config) async {
+ bool transferable = config[0];
+ int toIsolateSize = config[1];
+ int fromIsolateSize = config[2];
+ SendPort outbox = config[3];
+
+ final port = ReceivePort();
+ final inbox = StreamIterator(port);
+ outbox.send(port.sendPort);
+ final data = generateSampleList(fromIsolateSize);
+ while (true) {
+ await inbox.moveNext();
+ if (inbox.current == null) {
+ break;
+ }
+ validateReceivedList(
+ toIsolateSize,
+ transferable
+ ? inbox.current.materialize().asUint8List()
+ : inbox.current);
+ outbox.send(transferable ? TransferableTypedData.fromList([data]) : data);
+ }
+ port.close();
+}
diff --git a/runtime/tests/vm/dart/transferable_test.dart b/runtime/tests/vm/dart/transferable_test.dart
new file mode 100644
index 0000000..a9a97d7
--- /dev/null
+++ b/runtime/tests/vm/dart/transferable_test.dart
@@ -0,0 +1,130 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// Test that validates that transferables are faster than regular typed data.
+
+import 'dart:async';
+import 'dart:isolate';
+import 'dart:typed_data';
+
+import "package:expect/expect.dart";
+
+const int toIsolateSize = 100 * 1024 * 1024;
+const int fromIsolateSize = 100 * 1024 * 1024;
+
+const int nIterations = 5;
+
+int iteration;
+bool keepTimerRunning;
+
+main() async {
+ keepTimerRunning = true;
+
+ print('--- standard');
+ iteration = nIterations;
+ final stopwatch = new Stopwatch()..start();
+ await runBatch(useTransferable: false);
+ final standard = stopwatch.elapsedMilliseconds;
+
+ print('--- transferable');
+ iteration = nIterations;
+ stopwatch.reset();
+ await runBatch(useTransferable: true);
+ final transferable = stopwatch.elapsedMilliseconds;
+ print(
+ 'standard($standard ms)/transferable($transferable ms): ${standard / transferable}x');
+ Expect.isTrue(standard / transferable > 1.2);
+ keepTimerRunning = false;
+}
+
+packageList(Uint8List data, bool useTransferable) {
+ return useTransferable
+ ? TransferableTypedData.fromList(<Uint8List>[data])
+ : data;
+}
+
+packageByteData(ByteData data, bool useTransferable) {
+ return useTransferable
+ ? TransferableTypedData.fromList(<Uint8List>[data.buffer.asUint8List()])
+ : data;
+}
+
+class StartMessage {
+ final SendPort sendPort;
+ final bool useTransferable;
+
+ StartMessage(this.sendPort, this.useTransferable);
+}
+
+runBatch({bool useTransferable}) async {
+ Timer.run(idleTimer);
+ final port = ReceivePort();
+ final inbox = StreamIterator<dynamic>(port);
+ final worker = await Isolate.spawn(
+ isolateMain, StartMessage(port.sendPort, useTransferable),
+ paused: true);
+ final workerCompleted = Completer<bool>();
+ final workerExitedPort = ReceivePort()
+ ..listen((_) => workerCompleted.complete(true));
+ worker.addOnExitListener(workerExitedPort.sendPort);
+ worker.resume(worker.pauseCapability);
+
+ await inbox.moveNext();
+ final outbox = inbox.current;
+ final workWatch = new Stopwatch();
+ final data = new Uint8List(toIsolateSize);
+
+ while (iteration-- > 0) {
+ final packagedData = packageList(data, useTransferable);
+ workWatch.start();
+ outbox.send(packagedData);
+ await inbox.moveNext();
+
+ final received = inbox.current;
+ final receivedData =
+ received is TransferableTypedData ? received.materialize() : received;
+ int time = workWatch.elapsedMilliseconds;
+ print('${time}ms for round-trip');
+ workWatch.reset();
+ }
+ outbox.send(null);
+
+ await workerCompleted.future;
+ workerExitedPort.close();
+ port.close();
+}
+
+Future<Null> isolateMain(StartMessage startMessage) async {
+ final port = new ReceivePort();
+ final inbox = new StreamIterator<dynamic>(port);
+ startMessage.sendPort.send(port.sendPort);
+ final data = Uint8List.view(new Uint8List(fromIsolateSize).buffer);
+ while (true) {
+ await inbox.moveNext();
+ final received = inbox.current;
+ if (received == null) {
+ break;
+ }
+ final receivedData =
+ received is TransferableTypedData ? received.materialize() : received;
+
+ final packagedData = packageList(data, startMessage.useTransferable);
+
+ startMessage.sendPort.send(packagedData);
+ }
+ port.close();
+}
+
+final Stopwatch idleWatch = new Stopwatch();
+
+void idleTimer() {
+ idleWatch.stop();
+ final time = idleWatch.elapsedMilliseconds;
+ if (time > 5) print('${time}ms since last checkin');
+ idleWatch.reset();
+ idleWatch.start();
+ if (keepTimerRunning) {
+ Timer.run(idleTimer);
+ }
+}
diff --git a/runtime/tests/vm/dart/transferable_throws_test.dart b/runtime/tests/vm/dart/transferable_throws_test.dart
new file mode 100644
index 0000000..538916e
--- /dev/null
+++ b/runtime/tests/vm/dart/transferable_throws_test.dart
@@ -0,0 +1,117 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// Test that ensures correct exception is thrown when attempting to use
+// transferred transferables.
+
+import 'dart:async';
+import 'dart:collection';
+import 'dart:core';
+import 'dart:io';
+import 'dart:isolate';
+import 'dart:typed_data';
+import 'dart:math';
+
+import "package:expect/expect.dart";
+
+throwsIfMaterializeAfterSend() {
+ final rp = ReceivePort();
+ final transferable = TransferableTypedData.fromList([Uint8List(1024)]);
+ rp.sendPort.send(transferable);
+ Expect.throwsArgumentError(() => transferable.materialize());
+ rp.close();
+}
+
+throwsIfSendMoreThanOnce() {
+ final rp = ReceivePort();
+ final bytes = Uint8List(1024);
+ final transferable = TransferableTypedData.fromList([bytes]);
+ rp.sendPort.send(transferable);
+ Expect.throwsArgumentError(() => rp.sendPort.send(transferable));
+ rp.close();
+}
+
+throwsIfMaterializeMoreThanOnce() {
+ final transferable = TransferableTypedData.fromList([Uint8List(1024)]);
+ transferable.materialize();
+ Expect.throwsArgumentError(() => transferable.materialize());
+}
+
+throwsIfReceiverMaterializesMoreThanOnce() async {
+ final rp = ReceivePort();
+ final completer = Completer<List>();
+ final isolateErrors = ReceivePort()..listen((e) => completer.complete(e));
+ await Isolate.spawn(
+ receiver, TransferableTypedData.fromList([Uint8List(1024)]),
+ onError: isolateErrors.sendPort);
+ final error = await completer.future;
+ Expect.equals(
+ error[0],
+ "Invalid argument(s): Attempt to materialize object that was"
+ " transferred already.");
+ isolateErrors.close();
+ rp.close();
+}
+
+void receiver(final transferable) {
+ transferable.materialize();
+ transferable.materialize();
+}
+
+throwsIfCummulativeListIsTooLargeOn32bitPlatform() {
+ try {
+ int maxUint8ListSize = pow(2, 30);
+ // Check whether we are on 32-bit or 64-bit platform.
+ new Uint8List(maxUint8ListSize);
+ // On 64-bit platform we will have difficulty allocating large enough
+ // Uint8List to verify "too large" use case, so do nothing.
+ return;
+ } catch (_) {}
+
+ var halfmax = new Uint8List(pow(2, 29) - 1);
+ Expect.throwsArgumentError(
+ () => TransferableTypedData.fromList([halfmax, halfmax, Uint8List(2)]));
+}
+
+throwsIfCummulativeListCantBeAllocated() {
+ // Attempt to create total 1tb uint8list which should fail on 32 and 64-bit
+ // platforms.
+ final bytes100MB = Uint8List(100 * 1024 * 1024);
+ final total1TB = List<Uint8List>.filled(10000, bytes100MB);
+ // Try to make a 1 TB transferable.
+ Expect.throws(() => TransferableTypedData.fromList(total1TB));
+}
+
+class MyList<T> extends ListBase<T> {
+ @override
+ int length;
+
+ @override
+ T operator [](int index) => null;
+ @override
+ void operator []=(int index, T value) {}
+}
+
+class MyTypedData implements TypedData {
+ noSuchMethod(_) {}
+}
+
+main() {
+ throwsIfMaterializeAfterSend();
+ throwsIfSendMoreThanOnce();
+ throwsIfMaterializeMoreThanOnce();
+ throwsIfReceiverMaterializesMoreThanOnce();
+ throwsIfCummulativeListIsTooLargeOn32bitPlatform();
+ if (!Platform.isMacOS) {
+ // this test crashes the process on mac.
+ throwsIfCummulativeListCantBeAllocated();
+ }
+
+ Expect.throwsArgumentError(() => TransferableTypedData.fromList(null));
+ Expect.throwsArgumentError(() => TransferableTypedData.fromList([null]));
+ Expect.throwsArgumentError(
+ () => TransferableTypedData.fromList(MyList<Uint8List>()));
+ Expect.throwsArgumentError(
+ () => TransferableTypedData.fromList([MyTypedData()]));
+}
diff --git a/runtime/tests/vm/vm.status b/runtime/tests/vm/vm.status
index 5876d36..30ce9e5 100644
--- a/runtime/tests/vm/vm.status
+++ b/runtime/tests/vm/vm.status
@@ -35,6 +35,7 @@
[ $hot_reload || $hot_reload_rollback ]
dart/compilation_trace_test: Pass, Slow
dart/type_feedback_test: Pass, Slow
+dart/issue_31959_31960_test: SkipSlow
[ $compiler != dartk || ($arch != x64 && $arch != simarm && $arch != arm) || $hot_reload || $hot_reload_rollback ]
dart/entrypoints/jit/*: SkipByDesign # Only supported in the Dart 2 JIT and AOT, and test optimizations - hence disabled on hotreload bots.
diff --git a/runtime/vm/bootstrap_natives.h b/runtime/vm/bootstrap_natives.h
index 34cfbc4..98c6129 100644
--- a/runtime/vm/bootstrap_natives.h
+++ b/runtime/vm/bootstrap_natives.h
@@ -387,7 +387,9 @@
V(Ffi_fromFunction, 1) \
V(Ffi_dl_open, 1) \
V(Ffi_dl_lookup, 2) \
- V(Ffi_dl_getHandle, 1)
+ V(Ffi_dl_getHandle, 1) \
+ V(TransferableTypedData_factory, 2) \
+ V(TransferableTypedData_materialize, 1)
// List of bootstrap native entry points used in the dart:mirror library.
#define MIRRORS_BOOTSTRAP_NATIVE_LIST(V) \
diff --git a/runtime/vm/class_id.h b/runtime/vm/class_id.h
index 9acabba..f95d77f 100644
--- a/runtime/vm/class_id.h
+++ b/runtime/vm/class_id.h
@@ -78,7 +78,8 @@
V(WeakProperty) \
V(MirrorReference) \
V(LinkedHashMap) \
- V(UserTag)
+ V(UserTag) \
+ V(TransferableTypedData)
#define CLASS_LIST_ARRAYS(V) \
V(Array) \
diff --git a/runtime/vm/dart_api_impl.cc b/runtime/vm/dart_api_impl.cc
index ca8ce78..5f9a66b 100644
--- a/runtime/vm/dart_api_impl.cc
+++ b/runtime/vm/dart_api_impl.cc
@@ -274,17 +274,6 @@
current_func, field_count, num_fields);
}
-Heap::Space SpaceForExternal(Thread* thread, intptr_t size) {
- Heap* heap = thread->heap();
- // If 'size' would be a significant fraction of new space, then use old.
- static const int kExtNewRatio = 16;
- if (size > (heap->CapacityInWords(Heap::kNew) * kWordSize) / kExtNewRatio) {
- return Heap::kOld;
- } else {
- return Heap::kNew;
- }
-}
-
static RawObject* Send0Arg(const Instance& receiver, const String& selector) {
const intptr_t kTypeArgsLen = 0;
const intptr_t kNumArgs = 1;
@@ -2527,7 +2516,7 @@
return Api::NewHandle(
T,
String::NewExternal(latin1_array, length, peer, external_allocation_size,
- callback, SpaceForExternal(T, length)));
+ callback, T->heap()->SpaceForExternal(length)));
}
DART_EXPORT Dart_Handle
@@ -2549,7 +2538,7 @@
return Api::NewHandle(
T,
String::NewExternal(utf16_array, length, peer, external_allocation_size,
- callback, SpaceForExternal(T, bytes)));
+ callback, T->heap()->SpaceForExternal(bytes)));
}
DART_EXPORT Dart_Handle Dart_StringToCString(Dart_Handle object,
@@ -3433,8 +3422,9 @@
Zone* zone = thread->zone();
intptr_t bytes = length * ExternalTypedData::ElementSizeInBytes(cid);
const ExternalTypedData& result = ExternalTypedData::Handle(
- zone, ExternalTypedData::New(cid, reinterpret_cast<uint8_t*>(data),
- length, SpaceForExternal(thread, bytes)));
+ zone,
+ ExternalTypedData::New(cid, reinterpret_cast<uint8_t*>(data), length,
+ thread->heap()->SpaceForExternal(bytes)));
if (callback != NULL) {
AllocateFinalizableHandle(thread, result, peer, external_allocation_size,
callback);
diff --git a/runtime/vm/finalizable_data.h b/runtime/vm/finalizable_data.h
index f4ae01a..df383d2 100644
--- a/runtime/vm/finalizable_data.h
+++ b/runtime/vm/finalizable_data.h
@@ -15,6 +15,7 @@
void* data;
void* peer;
Dart_WeakPersistentHandleFinalizer callback;
+ Dart_WeakPersistentHandleFinalizer successful_write_callback;
};
class MessageFinalizableData {
@@ -23,18 +24,24 @@
~MessageFinalizableData() {
for (intptr_t i = position_; i < records_.length(); i++) {
- records_[i].callback(NULL, NULL, records_[i].peer);
+ records_[i].callback(nullptr, nullptr, records_[i].peer);
}
}
- void Put(intptr_t external_size,
- void* data,
- void* peer,
- Dart_WeakPersistentHandleFinalizer callback) {
+ /// If [successful_write_callback] is provided, it's invoked when message
+ /// was serialized successfully.
+ /// [callback] is invoked when serialization failed.
+ void Put(
+ intptr_t external_size,
+ void* data,
+ void* peer,
+ Dart_WeakPersistentHandleFinalizer callback,
+ Dart_WeakPersistentHandleFinalizer successful_write_callback = nullptr) {
FinalizableData finalizable_data;
finalizable_data.data = data;
finalizable_data.peer = peer;
finalizable_data.callback = callback;
+ finalizable_data.successful_write_callback = successful_write_callback;
records_.Add(finalizable_data);
external_size_ += external_size;
}
@@ -44,6 +51,15 @@
return records_[position_++];
}
+ void SerializationSucceeded() {
+ for (intptr_t i = position_; i < records_.length(); i++) {
+ if (records_[i].successful_write_callback != nullptr) {
+ records_[i].successful_write_callback(nullptr, nullptr,
+ records_[i].peer);
+ }
+ }
+ }
+
intptr_t external_size() const { return external_size_; }
private:
diff --git a/runtime/vm/heap/heap.cc b/runtime/vm/heap/heap.cc
index c8a1bdf..ca9aa44 100644
--- a/runtime/vm/heap/heap.cc
+++ b/runtime/vm/heap/heap.cc
@@ -1031,6 +1031,16 @@
#endif // !defined(PRODUCT)
}
+Heap::Space Heap::SpaceForExternal(intptr_t size) const {
+ // If 'size' would be a significant fraction of new space, then use old.
+ static const int kExtNewRatio = 16;
+ if (size > (CapacityInWords(Heap::kNew) * kWordSize) / kExtNewRatio) {
+ return Heap::kOld;
+ } else {
+ return Heap::kNew;
+ }
+}
+
NoHeapGrowthControlScope::NoHeapGrowthControlScope()
: ThreadStackResource(Thread::Current()) {
Heap* heap = reinterpret_cast<Isolate*>(isolate())->heap();
diff --git a/runtime/vm/heap/heap.h b/runtime/vm/heap/heap.h
index 6bbcb19..6f530b9 100644
--- a/runtime/vm/heap/heap.h
+++ b/runtime/vm/heap/heap.h
@@ -301,6 +301,7 @@
}
void MakeTLABIterable(Thread* thread);
void AbandonRemainingTLAB(Thread* thread);
+ Space SpaceForExternal(intptr_t size) const;
void CollectOnNextAllocation();
diff --git a/runtime/vm/object.cc b/runtime/vm/object.cc
index 40a56d7..09d5af11 100644
--- a/runtime/vm/object.cc
+++ b/runtime/vm/object.cc
@@ -1527,6 +1527,11 @@
RegisterPrivateClass(cls, Symbols::_SendPortImpl(), isolate_lib);
pending_classes.Add(cls);
+ cls = Class::New<TransferableTypedData>();
+ RegisterPrivateClass(cls, Symbols::_TransferableTypedDataImpl(),
+ isolate_lib);
+ pending_classes.Add(cls);
+
const Class& stacktrace_cls = Class::Handle(zone, Class::New<StackTrace>());
RegisterPrivateClass(stacktrace_cls, Symbols::_StackTrace(), core_lib);
pending_classes.Add(stacktrace_cls);
@@ -2059,6 +2064,8 @@
cls = Class::New<MirrorReference>();
cls = Class::New<UserTag>();
+
+ cls = Class::New<TransferableTypedData>();
}
return Error::null();
}
@@ -21410,6 +21417,40 @@
return "SendPort";
}
+static void TransferableTypedDataFinalizer(void* isolate_callback_data,
+ Dart_WeakPersistentHandle handle,
+ void* peer) {
+ delete (reinterpret_cast<TransferableTypedDataPeer*>(peer));
+}
+
+RawTransferableTypedData* TransferableTypedData::New(uint8_t* data,
+ intptr_t length,
+ Heap::Space space) {
+ TransferableTypedDataPeer* peer = new TransferableTypedDataPeer(data, length);
+
+ Thread* thread = Thread::Current();
+ TransferableTypedData& result = TransferableTypedData::Handle();
+ {
+ RawObject* raw =
+ Object::Allocate(TransferableTypedData::kClassId,
+ TransferableTypedData::InstanceSize(), space);
+ NoSafepointScope no_safepoint;
+ thread->heap()->SetPeer(raw, peer);
+ result ^= raw;
+ }
+ // Set up finalizer so it frees allocated memory if handle is
+ // garbage-collected.
+ peer->set_handle(FinalizablePersistentHandle::New(
+ thread->isolate(), result, peer, &TransferableTypedDataFinalizer,
+ length));
+
+ return result.raw();
+}
+
+const char* TransferableTypedData::ToCString() const {
+ return "TransferableTypedData";
+}
+
const char* Closure::ToCString() const {
Zone* zone = Thread::Current()->zone();
const Function& fun = Function::Handle(zone, function());
diff --git a/runtime/vm/object.h b/runtime/vm/object.h
index 99bd0f6..3332ce1 100644
--- a/runtime/vm/object.h
+++ b/runtime/vm/object.h
@@ -8522,12 +8522,22 @@
}
}
+ void* DataAddr(intptr_t byte_offset) const {
+ ASSERT((byte_offset == 0) ||
+ ((byte_offset > 0) && (byte_offset < LengthInBytes())));
+ return reinterpret_cast<void*>(Validate(raw_ptr()->data_) + byte_offset);
+ }
+
protected:
void SetLength(intptr_t value) const {
ASSERT(value <= Smi::kMaxValue);
StoreSmi(&raw_ptr()->length_, Smi::New(value));
}
+ virtual uint8_t* Validate(uint8_t* data) const {
+ return UnsafeMutableNonPointer(data);
+ }
+
private:
friend class Class;
@@ -8551,13 +8561,6 @@
// architecture.
static const intptr_t kHashBits = 30;
- void* DataAddr(intptr_t byte_offset) const {
- ASSERT((byte_offset == 0) ||
- ((byte_offset > 0) && (byte_offset < LengthInBytes())));
- return reinterpret_cast<void*>(UnsafeMutableNonPointer(raw_ptr()->data()) +
- byte_offset);
- }
-
virtual bool CanonicalizeEquals(const Instance& other) const;
virtual uint32_t CanonicalizeHash() const;
@@ -8698,12 +8701,6 @@
// snapshot. Should be independent of word size.
static const int kDataSerializationAlignment = 8;
- void* DataAddr(intptr_t byte_offset) const {
- ASSERT((byte_offset == 0) ||
- ((byte_offset > 0) && (byte_offset < LengthInBytes())));
- return reinterpret_cast<void*>(raw_ptr()->data_ + byte_offset);
- }
-
#define TYPED_GETTER_SETTER(name, type) \
type Get##name(intptr_t byte_offset) const { \
return ReadUnaligned(reinterpret_cast<type*>(DataAddr(byte_offset))); \
@@ -8757,6 +8754,8 @@
}
protected:
+ virtual uint8_t* Validate(uint8_t* data) const { return data; }
+
void SetLength(intptr_t value) const {
ASSERT(value <= Smi::kMaxValue);
StoreSmi(&raw_ptr()->length_, Smi::New(value));
@@ -8829,6 +8828,9 @@
RawSmi* offset_in_bytes() const { return raw_ptr()->offset_in_bytes_; }
+ protected:
+ virtual uint8_t* Validate(uint8_t* data) const { return data; }
+
private:
void RecomputeDataField() { raw()->RecomputeDataField(); }
@@ -9195,6 +9197,50 @@
friend class Class;
};
+// This is allocated when new instance of TransferableTypedData is created in
+// [TransferableTypedData::New].
+class TransferableTypedDataPeer {
+ public:
+ // [data] backing store should be malloc'ed, not new'ed.
+ TransferableTypedDataPeer(uint8_t* data, intptr_t length)
+ : data_(data), length_(length), handle_(nullptr) {}
+
+ ~TransferableTypedDataPeer() { free(data_); }
+
+ uint8_t* data() const { return data_; }
+ intptr_t length() const { return length_; }
+ FinalizablePersistentHandle* handle() const { return handle_; }
+ void set_handle(FinalizablePersistentHandle* handle) { handle_ = handle; }
+
+ void ClearData() {
+ data_ = nullptr;
+ length_ = 0;
+ handle_ = nullptr;
+ }
+
+ private:
+ uint8_t* data_;
+ intptr_t length_;
+ FinalizablePersistentHandle* handle_;
+
+ DISALLOW_COPY_AND_ASSIGN(TransferableTypedDataPeer);
+};
+
+class TransferableTypedData : public Instance {
+ public:
+ static RawTransferableTypedData* New(uint8_t* data,
+ intptr_t len,
+ Heap::Space space = Heap::kNew);
+
+ static intptr_t InstanceSize() {
+ return RoundedAllocationSize(sizeof(RawTransferableTypedData));
+ }
+
+ private:
+ FINAL_HEAP_OBJECT_IMPLEMENTATION(TransferableTypedData, Instance);
+ friend class Class;
+};
+
// Internal stacktrace object used in exceptions for printing stack traces.
class StackTrace : public Instance {
public:
diff --git a/runtime/vm/object_service.cc b/runtime/vm/object_service.cc
index cab61e5..3616d4e 100644
--- a/runtime/vm/object_service.cc
+++ b/runtime/vm/object_service.cc
@@ -1455,6 +1455,10 @@
Instance::PrintJSONImpl(stream, ref);
}
+void TransferableTypedData::PrintJSONImpl(JSONStream* stream, bool ref) const {
+ Instance::PrintJSONImpl(stream, ref);
+}
+
void ClosureData::PrintJSONImpl(JSONStream* stream, bool ref) const {
Object::PrintJSONImpl(stream, ref);
}
diff --git a/runtime/vm/raw_object.cc b/runtime/vm/raw_object.cc
index eb5d39a..0658a6d 100644
--- a/runtime/vm/raw_object.cc
+++ b/runtime/vm/raw_object.cc
@@ -478,6 +478,7 @@
NULL_VISITOR(Bool)
NULL_VISITOR(Capability)
NULL_VISITOR(SendPort)
+NULL_VISITOR(TransferableTypedData)
REGULAR_VISITOR(Pointer)
NULL_VISITOR(DynamicLibrary)
VARIABLE_NULL_VISITOR(Instructions, Instructions::Size(raw_obj))
diff --git a/runtime/vm/raw_object.h b/runtime/vm/raw_object.h
index 629723c..0eef3d8 100644
--- a/runtime/vm/raw_object.h
+++ b/runtime/vm/raw_object.h
@@ -723,6 +723,7 @@
friend class ObjectOffsetTrait; // GetClassId
friend class WriteBarrierUpdateVisitor; // CheckHeapPointerStore
friend class OffsetsTable;
+ friend class RawTransferableTypedData; // GetClassId
DISALLOW_ALLOCATION();
DISALLOW_IMPLICIT_CONSTRUCTORS(RawObject);
@@ -2411,6 +2412,11 @@
VISIT_TO(RawObject*, handler_)
};
+class RawTransferableTypedData : public RawInstance {
+ RAW_HEAP_OBJECT_IMPLEMENTATION(TransferableTypedData);
+ VISIT_NOTHING();
+};
+
// VM type for capturing stacktraces when exceptions are thrown,
// Currently we don't have any interface that this object is supposed
// to implement so we just support the 'toString' method which
diff --git a/runtime/vm/raw_object_snapshot.cc b/runtime/vm/raw_object_snapshot.cc
index 296b478..d734dbb 100644
--- a/runtime/vm/raw_object_snapshot.cc
+++ b/runtime/vm/raw_object_snapshot.cc
@@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+#include "vm/dart_api_state.h"
#include "vm/message.h"
#include "vm/native_entry.h"
#include "vm/object.h"
@@ -2133,6 +2134,68 @@
writer->Write<uint64_t>(ptr()->origin_id_);
}
+RawTransferableTypedData* TransferableTypedData::ReadFrom(
+ SnapshotReader* reader,
+ intptr_t object_id,
+ intptr_t tags,
+ Snapshot::Kind kind,
+ bool as_reference) {
+ ASSERT(reader != nullptr);
+
+ ASSERT(!Snapshot::IsFull(kind));
+ const intptr_t length = reader->Read<int32_t>();
+
+ const FinalizableData finalizable_data =
+ static_cast<MessageSnapshotReader*>(reader)->finalizable_data()->Take();
+ uint8_t* data = reinterpret_cast<uint8_t*>(finalizable_data.data);
+ auto& transferableTypedData = TransferableTypedData::ZoneHandle(
+ reader->zone(), TransferableTypedData::New(data, length));
+ reader->AddBackRef(object_id, &transferableTypedData, kIsDeserialized);
+ return transferableTypedData.raw();
+}
+
+void RawTransferableTypedData::WriteTo(SnapshotWriter* writer,
+ intptr_t object_id,
+ Snapshot::Kind kind,
+ bool as_reference) {
+ ASSERT(writer != nullptr);
+ ASSERT(GetClassId() == kTransferableTypedDataCid);
+ void* peer = writer->thread()->heap()->GetPeer(this);
+ // Assume that object's Peer is only used to track transferrability state.
+ ASSERT(peer != nullptr);
+ TransferableTypedDataPeer* tpeer =
+ reinterpret_cast<TransferableTypedDataPeer*>(peer);
+ intptr_t length = tpeer->length(); // In bytes.
+ void* data = tpeer->data();
+ if (data == nullptr) {
+ writer->SetWriteException(
+ Exceptions::kArgument,
+ "Illegal argument in isolate message"
+ " : (TransferableTypedData has been transferred already)");
+ return;
+ }
+
+ // Write out the serialization header value for this object.
+ writer->WriteInlinedObjectHeader(object_id);
+
+ writer->WriteIndexedObject(GetClassId());
+ writer->WriteTags(writer->GetObjectTags(this));
+ writer->Write<int32_t>(length);
+
+ static_cast<MessageWriter*>(writer)->finalizable_data()->Put(
+ length, data, tpeer,
+ // Finalizer does nothing - in case of failure to serialize,
+ // [data] remains wrapped in sender's [TransferableTypedData].
+ [](void* data, Dart_WeakPersistentHandle handle, void* peer) {},
+ // This is invoked on successful serialization of the message
+ [](void* data, Dart_WeakPersistentHandle handle, void* peer) {
+ TransferableTypedDataPeer* tpeer =
+ reinterpret_cast<TransferableTypedDataPeer*>(peer);
+ tpeer->handle()->EnsureFreeExternal(Isolate::Current());
+ tpeer->ClearData();
+ });
+}
+
RawStackTrace* StackTrace::ReadFrom(SnapshotReader* reader,
intptr_t object_id,
intptr_t tags,
diff --git a/runtime/vm/snapshot.cc b/runtime/vm/snapshot.cc
index aeb1714..3824991 100644
--- a/runtime/vm/snapshot.cc
+++ b/runtime/vm/snapshot.cc
@@ -45,7 +45,8 @@
RawObject::IsStringClassId(class_id) ||
RawObject::IsTypedDataClassId(class_id) ||
RawObject::IsExternalTypedDataClassId(class_id) ||
- RawObject::IsTypedDataViewClassId(class_id) || class_id == kNullCid);
+ RawObject::IsTypedDataViewClassId(class_id) || class_id == kNullCid ||
+ class_id == kTransferableTypedDataCid);
}
static bool IsObjectStoreTypeId(intptr_t index) {
@@ -1483,6 +1484,8 @@
}
if (has_exception) {
ThrowException(exception_type(), exception_msg());
+ } else {
+ finalizable_data_->SerializationSucceeded();
}
MessageFinalizableData* finalizable_data = finalizable_data_;
diff --git a/runtime/vm/snapshot.h b/runtime/vm/snapshot.h
index 995afc7..0939635e 100644
--- a/runtime/vm/snapshot.h
+++ b/runtime/vm/snapshot.h
@@ -442,6 +442,7 @@
friend class Script;
friend class SignatureData;
friend class SubtypeTestCache;
+ friend class TransferableTypedData;
friend class Type;
friend class TypedDataView;
friend class TypeArguments;
@@ -715,6 +716,7 @@
friend class RawScript;
friend class RawStackTrace;
friend class RawSubtypeTestCache;
+ friend class RawTransferableTypedData;
friend class RawType;
friend class RawTypedDataView;
friend class RawTypeRef;
diff --git a/runtime/vm/symbols.h b/runtime/vm/symbols.h
index f265e3e..5454049 100644
--- a/runtime/vm/symbols.h
+++ b/runtime/vm/symbols.h
@@ -287,6 +287,7 @@
V(ThrowNewInvocation, "_throwNewInvocation") \
V(TopLevel, "::") \
V(TruncDivOperator, "~/") \
+ V(TransferableTypedData, "TransferableTypedData") \
V(TryFinallyReturnValue, ":try_finally_return_value") \
V(TwoByteString, "_TwoByteString") \
V(TwoNewlines, "\n\n") \
@@ -403,6 +404,7 @@
V(_RawReceivePortImpl, "_RawReceivePortImpl") \
V(_RegExp, "_RegExp") \
V(_SendPortImpl, "_SendPortImpl") \
+ V(_TransferableTypedDataImpl, "_TransferableTypedDataImpl") \
V(_Smi, "_Smi") \
V(_SourceLocation, "_SourceLocation") \
V(_SpecialTypeMirror, "_SpecialTypeMirror") \
diff --git a/sdk/lib/_internal/js_dev_runtime/patch/isolate_patch.dart b/sdk/lib/_internal/js_dev_runtime/patch/isolate_patch.dart
index 9ccdada..3d1f95c 100644
--- a/sdk/lib/_internal/js_dev_runtime/patch/isolate_patch.dart
+++ b/sdk/lib/_internal/js_dev_runtime/patch/isolate_patch.dart
@@ -116,6 +116,13 @@
factory Capability() => _unsupported();
}
+@patch
+abstract class TransferableTypedData {
+ @patch
+ factory TransferableTypedData.fromList(List<TypedData> list) =>
+ _unsupported();
+}
+
@NoReifyGeneric()
T _unsupported<T>() {
throw UnsupportedError('dart:isolate is not supported on dart4web');
diff --git a/sdk/lib/_internal/js_runtime/lib/isolate_patch.dart b/sdk/lib/_internal/js_runtime/lib/isolate_patch.dart
index 56bab4c..42da488 100644
--- a/sdk/lib/_internal/js_runtime/lib/isolate_patch.dart
+++ b/sdk/lib/_internal/js_runtime/lib/isolate_patch.dart
@@ -7,6 +7,7 @@
import "dart:async";
import 'dart:_foreign_helper' show JS;
import 'dart:_js_helper' show patch;
+import "dart:typed_data" show ByteData, TypedData, Uint8List;
@patch
class Isolate {
@@ -146,6 +147,14 @@
}
}
+@patch
+abstract class TransferableTypedData {
+ @patch
+ factory TransferableTypedData.fromList(List<TypedData> list) {
+ throw new UnsupportedError('TransferableTypedData.fromList');
+ }
+}
+
/// Returns the base path added to Uri.base to resolve `package:` Uris.
///
/// This is used by `Isolate.resolvePackageUri` to load resources. The default
diff --git a/sdk/lib/isolate/isolate.dart b/sdk/lib/isolate/isolate.dart
index 0aa74c9..e786e16 100644
--- a/sdk/lib/isolate/isolate.dart
+++ b/sdk/lib/isolate/isolate.dart
@@ -18,6 +18,7 @@
import "dart:async";
import "dart:_internal" show Since;
+import "dart:typed_data" show ByteBuffer, TypedData, Uint8List;
part "capability.dart";
@@ -753,3 +754,36 @@
stackTrace = new StackTrace.fromString(stackDescription);
String toString() => _description;
}
+
+/*
+ * An efficiently transferable sequence of byte values.
+ *
+ * A [TransferableTypedData] is created from a number of bytes.
+ * This will take time proportional to the number of bytes.
+ *
+ * The [TransferableTypedData] can be moved between isolates, so
+ * sending it through a send port will only take constant time.
+ *
+ * When sent this way, the local transferable can no longer be materialized,
+ * and the received object is now the only way to materialize the data.
+ */
+@Since("2.3.2")
+abstract class TransferableTypedData {
+ /**
+ * Creates a new [TransferableTypedData] containing the bytes of [list].
+ *
+ * It must be possible to create a single [Uint8List] containing the
+ * bytes, so if there are more bytes than what the platform allows in
+ * a single [Uint8List], then creation fails.
+ */
+ external factory TransferableTypedData.fromList(List<TypedData> list);
+
+ /**
+ * Creates a new [ByteBuffer] containing the bytes stored in this [TransferableTypedData].
+ *
+ * The [TransferableTypedData] is a cross-isolate single-use resource.
+ * This method must not be called more than once on the same underlying
+ * transferable bytes, even if the calls occur in different isolates.
+ */
+ ByteBuffer materialize();
+}
diff --git a/tests/lib_2/isolate/transferable_failed_to_send_test.dart b/tests/lib_2/isolate/transferable_failed_to_send_test.dart
new file mode 100644
index 0000000..1bca938
--- /dev/null
+++ b/tests/lib_2/isolate/transferable_failed_to_send_test.dart
@@ -0,0 +1,83 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:io" show ServerSocket;
+import "dart:isolate";
+import "dart:typed_data" show ByteData;
+
+import "package:expect/expect.dart";
+import "package:async_helper/async_helper.dart";
+
+void main() async {
+ final port = new ReceivePort();
+
+ // Sending a socket object will result in an error.
+ final socket = await ServerSocket.bind('localhost', 0);
+
+ final x = new ByteData(4);
+ for (int i = 0; i < 4; i++) {
+ x.setUint8(i, i);
+ }
+ {
+ final transferableFirst = TransferableTypedData.fromList([x]);
+ Expect.throwsArgumentError(
+ () => port.sendPort.send(<dynamic>[transferableFirst, socket]));
+ // Once TransferableTypedData was sent even if attempt failed, it can't be
+ // materialized.
+ // This need to be changed so that on failed send we should not detach the
+ // buffer form the transferrable. The order should not matter (i.e. if the
+ // error happens before or after the serializer hits a transferrable object)
+
+ final data1 = transferableFirst.materialize().asUint8List();
+ Expect.equals(x.lengthInBytes, data1.length);
+ for (int i = 0; i < data1.length; i++) {
+ Expect.equals(i, data1[i]);
+ }
+ }
+ {
+ final transferableFirst = TransferableTypedData.fromList([x]);
+ Expect.throwsArgumentError(() => port.sendPort
+ .send(<dynamic>[transferableFirst, transferableFirst, socket]));
+ // Once TransferableTypedData was sent even if attempt failed, it can't be
+ // materialized.
+ // This need to be changed so that on failed send we should not detach the
+ // buffer form the transferrable. The order should not matter (i.e. if the
+ // error happens before or after the serializer hits a transferrable object)
+
+ final data1 = transferableFirst.materialize().asUint8List();
+ Expect.equals(x.lengthInBytes, data1.length);
+ for (int i = 0; i < data1.length; i++) {
+ Expect.equals(i, data1[i]);
+ }
+ }
+
+ {
+ final transferableSecond = TransferableTypedData.fromList([x]);
+ Expect.throwsArgumentError(
+ () => port.sendPort.send(<dynamic>[socket, transferableSecond]));
+ // Once TransferableTypedData was sent even if attempt failed, it can't be
+ // materialized.
+ final data2 = transferableSecond.materialize().asUint8List();
+ Expect.equals(x.lengthInBytes, data2.length);
+ for (int i = 0; i < data2.length; i++) {
+ Expect.equals(i, data2[i]);
+ }
+ }
+
+ {
+ final transferableSecond = TransferableTypedData.fromList([x]);
+ Expect.throwsArgumentError(() => port.sendPort
+ .send(<dynamic>[socket, transferableSecond, transferableSecond]));
+ // Once TransferableTypedData was sent even if attempt failed, it can't be
+ // materialized.
+ final data2 = transferableSecond.materialize().asUint8List();
+ Expect.equals(x.lengthInBytes, data2.length);
+ for (int i = 0; i < data2.length; i++) {
+ Expect.equals(i, data2[i]);
+ }
+ }
+
+ socket.close();
+ port.close();
+}
diff --git a/tests/lib_2/isolate/transferable_test.dart b/tests/lib_2/isolate/transferable_test.dart
new file mode 100644
index 0000000..df28bab
--- /dev/null
+++ b/tests/lib_2/isolate/transferable_test.dart
@@ -0,0 +1,284 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+import "dart:collection";
+import "dart:isolate";
+import "dart:typed_data";
+import "package:expect/expect.dart";
+
+const large = 2 * 1024 * 1024;
+
+void child(replyPort) {
+ print("Child start");
+
+ print("Child ByteData");
+ dynamic x = new ByteData(large);
+ for (int i = 0; i < 4; i++) {
+ x.setInt8(i, i);
+ }
+ replyPort.send(TransferableTypedData.fromList([x.buffer.asUint8List()]));
+
+ print("Child Uint8List");
+ x = new Uint8List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Uint8List.view");
+ x = new Uint8List.view(x.buffer, 1, 2);
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Int8List");
+ x = new Int8List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Uint16List");
+ x = new Uint16List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Int16List");
+ x = new Int16List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Uint32List");
+ x = new Uint32List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Int32List");
+ x = new Int32List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Uint64List");
+ x = new Uint64List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child Int64List");
+ x = new Int64List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send(TransferableTypedData.fromList([x]));
+
+ print("Child two Uint8Lists");
+ x = new Uint8List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ replyPort.send([
+ TransferableTypedData.fromList([x]),
+ TransferableTypedData.fromList([x])
+ ]);
+
+ print("Child same Uint8List twice - materialize first");
+ x = new Uint8List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ var tr = TransferableTypedData.fromList([x]);
+ replyPort.send([tr, tr]);
+
+ print("Child same Uint8List twice - materialize second");
+ x = new Uint8List(large);
+ for (int i = 0; i < 4; i++) {
+ x[i] = i;
+ }
+ tr = TransferableTypedData.fromList([x]);
+ replyPort.send([tr, tr]);
+
+ print("Child done");
+}
+
+Future<void> main(List<String> args) async {
+ print("Parent start");
+
+ ReceivePort port = new ReceivePort();
+ Isolate.spawn(child, port.sendPort);
+ StreamIterator<dynamic> incoming = new StreamIterator<dynamic>(port);
+
+ print("Parent ByteData");
+ Expect.isTrue(await incoming.moveNext());
+ dynamic x = incoming.current.materialize().asByteData();
+ Expect.isTrue(x is ByteData);
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x.getUint8(i));
+ }
+
+ print("Parent Uint8List");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize();
+ Expect.isTrue(x is ByteBuffer);
+ x = x.asUint8List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Uint8List view");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asUint8List();
+ Expect.equals(1, x[0]);
+ Expect.equals(2, x[1]);
+
+ print("Parent Int8");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asInt8List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Uint16");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asUint16List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Int16");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asInt16List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Uint32");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asUint32List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Int32");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asInt32List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Uint64");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asUint64List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent Int64");
+ Expect.isTrue(await incoming.moveNext());
+ x = incoming.current.materialize().asInt64List();
+ Expect.equals(large, x.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x[i]);
+ }
+
+ print("Parent two Uint8Lists");
+ Expect.isTrue(await incoming.moveNext());
+ final x1 = incoming.current[0].materialize().asUint8List();
+ final x2 = incoming.current[1].materialize().asUint8List();
+ Expect.equals(large, x1.length);
+ Expect.equals(large, x2.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, x1[i]);
+ Expect.equals(i, x2[i]);
+ }
+
+ print("Parent same Uint8Lists twice, materialize first");
+ Expect.isTrue(await incoming.moveNext());
+ final tr0 = incoming.current[0].materialize().asUint8List();
+ Expect.throwsArgumentError(() => incoming.current[1].materialize());
+ Expect.equals(large, tr0.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, tr0[i]);
+ }
+
+ print("Parent same Uint8Lists twice, materialize second");
+ Expect.isTrue(await incoming.moveNext());
+ final tr1 = incoming.current[1].materialize().asUint8List();
+ Expect.throwsArgumentError(() => incoming.current[0].materialize());
+ Expect.equals(large, tr1.length);
+ for (int i = 0; i < 4; i++) {
+ Expect.equals(i, tr1[i]);
+ }
+
+ port.close();
+ print("Parent done");
+
+ testCreateMaterializeInSameIsolate();
+ testIterableToList();
+ testUserExtendedList();
+}
+
+testCreateMaterializeInSameIsolate() {
+ // Test same-isolate operation of TransferableTypedData.
+ final Uint8List bytes = new Uint8List(large);
+ for (int i = 0; i < bytes.length; ++i) {
+ bytes[i] = i % 256;
+ }
+ final tr = TransferableTypedData.fromList([bytes]);
+ Expect.listEquals(bytes, tr.materialize().asUint8List());
+}
+
+testIterableToList() {
+ // Test that iterable.toList() can be used as an argument.
+ final list1 = Uint8List(10);
+ for (int i = 0; i < list1.length; i++) {
+ list1[i] = i;
+ }
+ final list2 = Uint8List(20);
+ for (int i = 0; i < list2.length; i++) {
+ list2[i] = i + list1.length;
+ }
+ final map = {list1: true, list2: true};
+ Iterable<Uint8List> iterable = map.keys;
+ final result = TransferableTypedData.fromList(iterable.toList())
+ .materialize()
+ .asUint8List();
+ for (int i = 0; i < result.length; i++) {
+ Expect.equals(i, result[i]);
+ }
+}
+
+class MyList<E> extends ListBase<E> {
+ List<E> _source;
+ MyList(this._source);
+ int get length => _source.length;
+ void set length(int length) {
+ _source.length = length;
+ }
+
+ E operator [](int index) => _source[index];
+ void operator []=(int index, E value) {
+ _source[index] = value;
+ }
+}
+
+testUserExtendedList() {
+ final list = MyList<TypedData>([Uint8List(10)]);
+ TransferableTypedData.fromList(list);
+}