Version 2.14.0-389.0.dev
Merge commit '0f7f8e361c8aa51e9c5e9b555c5348dec281600e' into 'dev'
diff --git a/.dart_tool/package_config.json b/.dart_tool/package_config.json
index d2df8f3..f7073dd 100644
--- a/.dart_tool/package_config.json
+++ b/.dart_tool/package_config.json
@@ -11,7 +11,7 @@
"constraint, update this by running tools/generate_package_config.dart."
],
"configVersion": 2,
- "generated": "2021-08-04T16:42:24.433381",
+ "generated": "2021-08-05T11:33:04.746536",
"generator": "tools/generate_package_config.dart",
"packages": [
{
@@ -256,7 +256,7 @@
"name": "dds",
"rootUri": "../pkg/dds",
"packageUri": "lib/",
- "languageVersion": "2.12"
+ "languageVersion": "2.13"
},
{
"name": "dev_compiler",
diff --git a/pkg/dds/lib/src/isolate_manager.dart b/pkg/dds/lib/src/isolate_manager.dart
index e9e14df..7c43bc6 100644
--- a/pkg/dds/lib/src/isolate_manager.dart
+++ b/pkg/dds/lib/src/isolate_manager.dart
@@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'package:dds/src/utils/mutex.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'client.dart';
@@ -139,32 +140,40 @@
}
void _updateIsolateState(String id, String name, String eventKind) {
- switch (eventKind) {
- case ServiceEvents.isolateStart:
- isolateStarted(id, name);
- break;
- case ServiceEvents.isolateExit:
- isolateExited(id);
- break;
- default:
- final isolate = isolates[id];
+ _mutex.runGuarded(
+ () {
switch (eventKind) {
- case ServiceEvents.pauseExit:
- isolate!.pausedOnExit();
+ case ServiceEvents.isolateStart:
+ isolateStarted(id, name);
break;
- case ServiceEvents.pausePostRequest:
- isolate!.pausedPostRequest();
- break;
- case ServiceEvents.pauseStart:
- isolate!.pausedOnStart();
- break;
- case ServiceEvents.resume:
- isolate!.resumed();
+ case ServiceEvents.isolateExit:
+ isolateExited(id);
break;
default:
- break;
+ final isolate = isolates[id];
+ // The isolate may have disappeared after the state event was sent.
+ if (isolate == null) {
+ return;
+ }
+ switch (eventKind) {
+ case ServiceEvents.pauseExit:
+ isolate.pausedOnExit();
+ break;
+ case ServiceEvents.pausePostRequest:
+ isolate.pausedPostRequest();
+ break;
+ case ServiceEvents.pauseStart:
+ isolate.pausedOnStart();
+ break;
+ case ServiceEvents.resume:
+ isolate.resumed();
+ break;
+ default:
+ break;
+ }
}
- }
+ },
+ );
}
/// Initializes the set of running isolates.
@@ -172,25 +181,30 @@
if (_initialized) {
return;
}
- final vm = await dds.vmServiceClient.sendRequest('getVM');
- final List<Map> isolateRefs = vm['isolates'].cast<Map<String, dynamic>>();
- // Check the pause event for each isolate to determine whether or not the
- // isolate is already paused.
- for (final isolateRef in isolateRefs) {
- final id = isolateRef['id'];
- final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
- 'isolateId': id,
- });
- final name = isolate['name'];
- if (isolate.containsKey('pauseEvent')) {
- isolates[id] = _RunningIsolate(this, id, name);
- final eventKind = isolate['pauseEvent']['kind'];
- _updateIsolateState(id, name, eventKind);
- } else {
- // If the isolate doesn't have a pauseEvent, assume it's running.
- isolateStarted(id, name);
- }
- }
+ await _mutex.runGuarded(
+ () async {
+ final vm = await dds.vmServiceClient.sendRequest('getVM');
+ final List<Map> isolateRefs =
+ vm['isolates'].cast<Map<String, dynamic>>();
+ // Check the pause event for each isolate to determine whether or not the
+ // isolate is already paused.
+ for (final isolateRef in isolateRefs) {
+ final id = isolateRef['id'];
+ final isolate = await dds.vmServiceClient.sendRequest('getIsolate', {
+ 'isolateId': id,
+ });
+ final name = isolate['name'];
+ if (isolate.containsKey('pauseEvent')) {
+ isolates[id] = _RunningIsolate(this, id, name);
+ final eventKind = isolate['pauseEvent']['kind'];
+ _updateIsolateState(id, name, eventKind);
+ } else {
+ // If the isolate doesn't have a pauseEvent, assume it's running.
+ isolateStarted(id, name);
+ }
+ }
+ },
+ );
_initialized = true;
}
@@ -218,16 +232,20 @@
DartDevelopmentServiceClient client,
json_rpc.Parameters parameters,
) async {
- final isolateId = parameters['isolateId'].asString;
- final isolate = isolates[isolateId];
- if (isolate == null) {
- return RPCResponses.collectedSentinel;
- }
- if (isolate.shouldResume(resumingClient: client)) {
- isolate.clearResumeApprovals();
- return await _sendResumeRequest(isolateId, parameters);
- }
- return RPCResponses.success;
+ return await _mutex.runGuarded(
+ () async {
+ final isolateId = parameters['isolateId'].asString;
+ final isolate = isolates[isolateId];
+ if (isolate == null) {
+ return RPCResponses.collectedSentinel;
+ }
+ if (isolate.shouldResume(resumingClient: client)) {
+ isolate.clearResumeApprovals();
+ return await _sendResumeRequest(isolateId, parameters);
+ }
+ return RPCResponses.success;
+ },
+ );
}
/// Forwards a `resume` request to the VM service.
@@ -248,5 +266,6 @@
bool _initialized = false;
final DartDevelopmentServiceImpl dds;
+ final _mutex = Mutex();
final Map<String, _RunningIsolate> isolates = {};
}
diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart
index 94f791a..fd1290c 100644
--- a/pkg/dds/lib/src/stream_manager.dart
+++ b/pkg/dds/lib/src/stream_manager.dart
@@ -10,6 +10,7 @@
import 'dds_impl.dart';
import 'logging_repository.dart';
import 'rpc_error_codes.dart';
+import 'utils/mutex.dart';
class StreamManager {
StreamManager(this.dds);
@@ -133,51 +134,56 @@
DartDevelopmentServiceClient? client,
String stream,
) async {
- assert(stream.isNotEmpty);
- if (!streamListeners.containsKey(stream)) {
- // Initialize the list of clients for the new stream before we do
- // anything else to ensure multiple clients registering for the same
- // stream in quick succession doesn't result in multiple streamListen
- // requests being sent to the VM service.
- streamListeners[stream] = <DartDevelopmentServiceClient>[];
- if ((stream == kDebugStream && client == null) ||
- stream != kDebugStream) {
- // This will return an RPC exception if the stream doesn't exist. This
- // will throw and the exception will be forwarded to the client.
- final result = await dds.vmServiceClient.sendRequest('streamListen', {
- 'streamId': stream,
- });
- assert(result['type'] == 'Success');
- }
- }
- if (streamListeners[stream]!.contains(client)) {
- throw kStreamAlreadySubscribedException;
- }
- if (client != null) {
- streamListeners[stream]!.add(client);
- if (loggingRepositories.containsKey(stream)) {
- loggingRepositories[stream]!.sendHistoricalLogs(client);
- } else if (stream == kServiceStream) {
- // Send all previously registered service extensions when a client
- // subscribes to the Service stream.
- for (final c in dds.clientManager.clients) {
- if (c == client) {
- continue;
- }
- final namespace = dds.getNamespace(c);
- for (final service in c.services.keys) {
- client.sendNotification(
- 'streamNotify',
- _buildStreamRegisteredEvent(
- namespace!,
- service,
- c.services[service]!,
- ),
- );
+ await _mutex.runGuarded(
+ () async {
+ assert(stream.isNotEmpty);
+ if (!streamListeners.containsKey(stream)) {
+ // Initialize the list of clients for the new stream before we do
+ // anything else to ensure multiple clients registering for the same
+ // stream in quick succession doesn't result in multiple streamListen
+ // requests being sent to the VM service.
+ streamListeners[stream] = <DartDevelopmentServiceClient>[];
+ if ((stream == kDebugStream && client == null) ||
+ stream != kDebugStream) {
+ // This will return an RPC exception if the stream doesn't exist. This
+ // will throw and the exception will be forwarded to the client.
+ final result =
+ await dds.vmServiceClient.sendRequest('streamListen', {
+ 'streamId': stream,
+ });
+ assert(result['type'] == 'Success');
}
}
- }
- }
+ if (streamListeners[stream]!.contains(client)) {
+ throw kStreamAlreadySubscribedException;
+ }
+ if (client != null) {
+ streamListeners[stream]!.add(client);
+ if (loggingRepositories.containsKey(stream)) {
+ loggingRepositories[stream]!.sendHistoricalLogs(client);
+ } else if (stream == kServiceStream) {
+ // Send all previously registered service extensions when a client
+ // subscribes to the Service stream.
+ for (final c in dds.clientManager.clients) {
+ if (c == client) {
+ continue;
+ }
+ final namespace = dds.getNamespace(c);
+ for (final service in c.services.keys) {
+ client.sendNotification(
+ 'streamNotify',
+ _buildStreamRegisteredEvent(
+ namespace!,
+ service,
+ c.services[service]!,
+ ),
+ );
+ }
+ }
+ }
+ }
+ },
+ );
}
List<Map<String, dynamic>>? getStreamHistory(String stream) {
@@ -198,27 +204,32 @@
String stream, {
bool cancelCoreStream = false,
}) async {
- assert(stream.isNotEmpty);
- final listeners = streamListeners[stream];
- if (listeners == null || client != null && !listeners.contains(client)) {
- throw kStreamNotSubscribedException;
- }
- listeners.remove(client);
- // Don't cancel streams DDS needs to function.
- if (listeners.isEmpty &&
- (!ddsCoreStreams.contains(stream) || cancelCoreStream)) {
- streamListeners.remove(stream);
- // Ensure the VM service hasn't shutdown.
- if (dds.vmServiceClient.isClosed) {
- return;
- }
- final result = await dds.vmServiceClient.sendRequest('streamCancel', {
- 'streamId': stream,
- });
- assert(result['type'] == 'Success');
- } else {
- streamListeners[stream] = listeners;
- }
+ await _mutex.runGuarded(
+ () async {
+ assert(stream.isNotEmpty);
+ final listeners = streamListeners[stream];
+ if (listeners == null ||
+ client != null && !listeners.contains(client)) {
+ throw kStreamNotSubscribedException;
+ }
+ listeners.remove(client);
+ // Don't cancel streams DDS needs to function.
+ if (listeners.isEmpty &&
+ (!ddsCoreStreams.contains(stream) || cancelCoreStream)) {
+ streamListeners.remove(stream);
+ // Ensure the VM service hasn't shutdown.
+ if (dds.vmServiceClient.isClosed) {
+ return;
+ }
+ final result = await dds.vmServiceClient.sendRequest('streamCancel', {
+ 'streamId': stream,
+ });
+ assert(result['type'] == 'Success');
+ } else {
+ streamListeners[stream] = listeners;
+ }
+ },
+ );
}
/// Cleanup stream subscriptions for `client` when it has disconnected.
@@ -280,4 +291,5 @@
final DartDevelopmentServiceImpl dds;
final streamListeners = <String, List<DartDevelopmentServiceClient>>{};
+ final _mutex = Mutex();
}
diff --git a/pkg/dds/lib/src/utils/mutex.dart b/pkg/dds/lib/src/utils/mutex.dart
new file mode 100644
index 0000000..b697e78
--- /dev/null
+++ b/pkg/dds/lib/src/utils/mutex.dart
@@ -0,0 +1,46 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+/// Used to protect global state accessed in blocks containing calls to
+/// asynchronous methods.
+class Mutex {
+ /// Executes a block of code containing asynchronous calls atomically.
+ ///
+ /// If no other asynchronous context is currently executing within
+ /// [criticalSection], it will immediately be called. Otherwise, the caller
+ /// will be suspended and entered into a queue to be resumed once the lock is
+ /// released.
+ Future<T> runGuarded<T>(FutureOr<T> Function() criticalSection) async {
+ try {
+ await _acquireLock();
+ return await criticalSection();
+ } finally {
+ _releaseLock();
+ }
+ }
+
+ Future<void> _acquireLock() async {
+ if (!_locked) {
+ _locked = true;
+ return;
+ }
+ final request = Completer<void>();
+ _outstandingRequests.add(request);
+ await request.future;
+ }
+
+ void _releaseLock() {
+ _locked = false;
+ if (_outstandingRequests.isNotEmpty) {
+ final request = _outstandingRequests.removeFirst();
+ request.complete();
+ }
+ }
+
+ bool _locked = false;
+ final _outstandingRequests = Queue<Completer<void>>();
+}
diff --git a/pkg/dds/pubspec.yaml b/pkg/dds/pubspec.yaml
index cfbf779..c6057f4 100644
--- a/pkg/dds/pubspec.yaml
+++ b/pkg/dds/pubspec.yaml
@@ -8,7 +8,7 @@
homepage: https://github.com/dart-lang/sdk/tree/master/pkg/dds
environment:
- sdk: '>=2.12.0 <3.0.0'
+ sdk: '>=2.13.0 <3.0.0'
dependencies:
async: ^2.4.1
diff --git a/pkg/dds/test/regress_45569_test.dart b/pkg/dds/test/regress_45569_test.dart
index d08c9c1..a630d6a 100644
--- a/pkg/dds/test/regress_45569_test.dart
+++ b/pkg/dds/test/regress_45569_test.dart
@@ -3,10 +3,13 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:io';
+import 'dart:math';
import 'package:dds/dds.dart';
import 'package:test/test.dart';
+import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';
+
import 'common/test_helper.dart';
void main() {
@@ -27,27 +30,32 @@
process.kill();
});
+ Future<void> streamSubscribeUnsubscribe(
+ VmService client, {
+ required bool delay,
+ }) async {
+ await client.streamListen('Service');
+ await Future.delayed(
+ Duration(milliseconds: delay ? Random().nextInt(200) : 0),
+ );
+ await client.streamCancel('Service');
+ }
+
test('Ensure streamListen and streamCancel calls are handled atomically',
() async {
- dds = await DartDevelopmentService.startDartDevelopmentService(
- remoteVmServiceUri,
- );
- expect(dds.isRunning, true);
- final connection1 = await vmServiceConnectUri(dds.wsUri.toString());
- final connection2 = await vmServiceConnectUri(dds.wsUri.toString());
+ for (int i = 0; i < 100; ++i) {
+ dds = await DartDevelopmentService.startDartDevelopmentService(
+ remoteVmServiceUri,
+ );
+ expect(dds.isRunning, true);
+ final connection1 = await vmServiceConnectUri(dds.wsUri.toString());
+ final connection2 = await vmServiceConnectUri(dds.wsUri.toString());
- for (int i = 0; i < 50; ++i) {
- final listenFutures = <Future>[
- connection1.streamListen('Service'),
- connection2.streamListen('Service'),
- ];
- await Future.wait(listenFutures);
-
- final cancelFutures = <Future>[
- connection1.streamCancel('Service'),
- connection2.streamCancel('Service'),
- ];
- await Future.wait(cancelFutures);
+ await Future.wait([
+ streamSubscribeUnsubscribe(connection1, delay: true),
+ streamSubscribeUnsubscribe(connection2, delay: false),
+ ]);
+ await dds.shutdown();
}
});
}
diff --git a/pkg/vm/lib/transformations/ffi_native.dart b/pkg/vm/lib/transformations/ffi_native.dart
index e666652..ff405f8 100644
--- a/pkg/vm/lib/transformations/ffi_native.dart
+++ b/pkg/vm/lib/transformations/ffi_native.dart
@@ -143,7 +143,16 @@
fileUri: currentLibrary!.fileUri,
getterReference: currentLibraryIndex?.lookupGetterReference(fieldName))
..fileOffset = node.fileOffset;
- currentLibrary!.addField(funcPtrField);
+ // Add field to the parent the FfiNative function belongs to.
+ final parent = node.parent;
+ if (parent is Class) {
+ parent.addField(funcPtrField);
+ } else if (parent is Library) {
+ parent.addField(funcPtrField);
+ } else {
+ throw 'Unexpected parent of @FfiNative function. '
+ 'Expected Class or Library, but found ${parent}.';
+ }
// _@FfiNative__square_root(x)
final callFuncPtrInvocation = FunctionInvocation(
diff --git a/runtime/lib/function.cc b/runtime/lib/function.cc
index 785e713..77800d4 100644
--- a/runtime/lib/function.cc
+++ b/runtime/lib/function.cc
@@ -40,14 +40,6 @@
return false;
}
const auto& other_closure = Closure::Cast(other);
- // Check that the delayed type argument vectors match.
- if (receiver.delayed_type_arguments() !=
- other_closure.delayed_type_arguments()) {
- // Mismatches should only happen when a generic function is involved.
- ASSERT(Function::Handle(receiver.function()).IsGeneric() ||
- Function::Handle(other_closure.function()).IsGeneric());
- return false;
- }
// Closures that are not implicit closures (tear-offs) are unique.
const auto& func_a = Function::Handle(zone, receiver.function());
if (!func_a.IsImplicitClosureFunction()) {
@@ -65,6 +57,21 @@
func_a.is_static() != func_b.is_static())) {
return false;
}
+ // Check that the delayed type argument vectors match.
+ if (receiver.delayed_type_arguments() !=
+ other_closure.delayed_type_arguments()) {
+ // Mismatches should only happen when a generic function is involved.
+ ASSERT(func_a.IsGeneric() || func_b.IsGeneric());
+ const auto& type_args_a =
+ TypeArguments::Handle(zone, receiver.delayed_type_arguments());
+ const auto& type_args_b =
+ TypeArguments::Handle(zone, other_closure.delayed_type_arguments());
+ if (type_args_a.IsNull() || type_args_b.IsNull() ||
+ (type_args_a.Length() != type_args_b.Length()) ||
+ !type_args_a.IsEquivalent(type_args_b, TypeEquality::kSyntactical)) {
+ return false;
+ }
+ }
if (!func_a.is_static()) {
// Check that the both receiver instances are the same.
const Context& context_a = Context::Handle(zone, receiver.context());
diff --git a/runtime/vm/object.cc b/runtime/vm/object.cc
index a1b8649..30650c6 100644
--- a/runtime/vm/object.cc
+++ b/runtime/vm/object.cc
@@ -8949,9 +8949,11 @@
}
}
}
- // Compare flags (IsGenericCovariantImpl).
- if (!Array::Equals(type_params.flags(), other_type_params.flags())) {
- return false;
+ if (kind != TypeEquality::kInSubtypeTest) {
+ // Compare flags (IsGenericCovariantImpl).
+ if (!Array::Equals(type_params.flags(), other_type_params.flags())) {
+ return false;
+ }
}
}
return true;
diff --git a/tests/ffi/ffi_native_test.dart b/tests/ffi/ffi_native_test.dart
index b700981..12ca0dd 100644
--- a/tests/ffi/ffi_native_test.dart
+++ b/tests/ffi/ffi_native_test.dart
@@ -48,6 +48,17 @@
external int returnIntPtrMethod(int x); //# 03: compile-time error
}
+// Regression test: Ensure same-name FfiNative functions don't collide in the
+// top-level namespace, but instead live under their parent (Library, Class).
+class A {
+ @FfiNative<Void Function()>('nop')
+ external static void foo();
+}
+class B {
+ @FfiNative<Void Function()>('nop')
+ external static void foo();
+}
+
void main() {
// Register test resolver for top-level functions above.
final root_lib_url = getRootLibraryUrl();
diff --git a/tests/language/regress/regress46816_test.dart b/tests/language/regress/regress46816_test.dart
new file mode 100644
index 0000000..bccc6d2
--- /dev/null
+++ b/tests/language/regress/regress46816_test.dart
@@ -0,0 +1,18 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:expect/expect.dart';
+
+class A<X extends num> {
+ void f<Y extends X>(Y y) {}
+}
+
+typedef Func = void Function<Y extends int>(Y);
+
+main() {
+ A<num> a = new A<int>();
+ dynamic f = (a as A<int>).f;
+ Expect.isTrue(f is Func);
+ print(f as Func);
+}
diff --git a/tests/language_2/regress/regress46816_test.dart b/tests/language_2/regress/regress46816_test.dart
new file mode 100644
index 0000000..9d39bac
--- /dev/null
+++ b/tests/language_2/regress/regress46816_test.dart
@@ -0,0 +1,20 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// @dart=2.9
+
+import 'package:expect/expect.dart';
+
+class A<X extends num> {
+ void f<Y extends X>(Y y) {}
+}
+
+typedef Func = void Function<Y extends int>(Y);
+
+main() {
+ A<num> a = new A<int>();
+ dynamic f = (a as A<int>).f;
+ Expect.isTrue(f is Func);
+ print(f as Func);
+}
diff --git a/tools/VERSION b/tools/VERSION
index d457a23..89007c1 100644
--- a/tools/VERSION
+++ b/tools/VERSION
@@ -27,5 +27,5 @@
MAJOR 2
MINOR 14
PATCH 0
-PRERELEASE 388
+PRERELEASE 389
PRERELEASE_PATCH 0
\ No newline at end of file