Adds simple native ServiceStream consumer API.

This enables metric collection of e.g. garbage collection by listening to all GC events.

Bug: b/117604553
Change-Id: Ic36fd2446cba90be0253ec5e24eddd1c16629c4b
Reviewed-on: https://dart-review.googlesource.com/c/85922
Commit-Queue: Clement Skau <cskau@google.com>
Reviewed-by: Ryan Macnak <rmacnak@google.com>
diff --git a/runtime/include/dart_tools_api.h b/runtime/include/dart_tools_api.h
index 809177b..49b1f19 100644
--- a/runtime/include/dart_tools_api.h
+++ b/runtime/include/dart_tools_api.h
@@ -222,6 +222,25 @@
     Dart_ServiceStreamCancelCallback cancel_callback);
 
 /**
+ * A callback invoked when the VM service receives an event.
+ */
+typedef void (*Dart_NativeStreamConsumer)(const uint8_t* event_json,
+                                          intptr_t event_json_length);
+
+/**
+ * Sets the native VM service stream callbacks for a particular stream.
+ * Note: The function may be called on multiple threads concurrently.
+ *
+ * \param consumer A function pointer to an event handler callback function.
+ *   A NULL value removes the existing listen callback function if any.
+ *
+ * \param stream_id The ID of the stream on which to set the callback.
+ */
+DART_EXPORT void Dart_SetNativeServiceStreamCallback(
+    Dart_NativeStreamConsumer consumer,
+    const char* stream_id);
+
+/**
  * Sends a data event to clients of the VM Service.
  *
  * A data event is used to pass an array of bytes to subscribed VM
diff --git a/runtime/vm/dart_api_impl.cc b/runtime/vm/dart_api_impl.cc
index 7d5eafc..e3e5603 100644
--- a/runtime/vm/dart_api_impl.cc
+++ b/runtime/vm/dart_api_impl.cc
@@ -5414,6 +5414,12 @@
   return NULL;
 }
 
+DART_EXPORT void Dart_SetNativeServiceStreamCallback(
+    Dart_NativeStreamConsumer consumer,
+    const char* stream_id) {
+  return;
+}
+
 DART_EXPORT Dart_Handle Dart_ServiceSendDataEvent(const char* stream_id,
                                                   const char* event_kind,
                                                   const uint8_t* bytes,
@@ -5518,6 +5524,12 @@
   return NULL;
 }
 
+DART_EXPORT void Dart_SetNativeServiceStreamCallback(
+    Dart_NativeStreamConsumer consumer,
+    const char* stream_id) {
+  Service::SetNativeServiceStreamCallback(consumer, stream_id);
+}
+
 DART_EXPORT Dart_Handle Dart_ServiceSendDataEvent(const char* stream_id,
                                                   const char* event_kind,
                                                   const uint8_t* bytes,
diff --git a/runtime/vm/service.cc b/runtime/vm/service.cc
index acf8b6f..679a9f1 100644
--- a/runtime/vm/service.cc
+++ b/runtime/vm/service.cc
@@ -1095,6 +1095,13 @@
     params.AddProperty("event", event);
   }
   PostEvent(event->isolate(), stream_id, event->KindAsCString(), &js);
+
+  // Post event to the native Service Stream handlers if set.
+  if (event->stream_info() != nullptr && event->stream_info()->consumer() != nullptr) {
+    auto length = js.buffer()->length();
+    event->stream_info()->consumer()(
+        reinterpret_cast<uint8_t*>(js.buffer()->buf()), length);
+  }
 }
 
 void Service::PostEvent(Isolate* isolate,
@@ -1261,6 +1268,17 @@
   stream_cancel_callback_ = cancel_callback;
 }
 
+void Service::SetNativeServiceStreamCallback(Dart_NativeStreamConsumer consumer,
+                                             const char* stream_id) {
+  for (auto stream : streams_) {
+    if (stream->id() == stream_id) {
+      stream->set_consumer(consumer);
+    }
+  }
+  // Enable stream.
+  ListenStream(stream_id);
+}
+
 void Service::SetGetServiceAssetsCallback(
     Dart_GetVMServiceAssetsArchive get_service_assets) {
   get_service_assets_callback_ = get_service_assets;
diff --git a/runtime/vm/service.h b/runtime/vm/service.h
index ace6da3..90f3e64 100644
--- a/runtime/vm/service.h
+++ b/runtime/vm/service.h
@@ -73,9 +73,15 @@
   void set_enabled(bool value) { enabled_ = value; }
   bool enabled() const { return enabled_; }
 
+  void set_consumer(Dart_NativeStreamConsumer consumer) {
+    callback_ = consumer;
+  }
+  Dart_NativeStreamConsumer consumer() const { return callback_; }
+
  private:
   const char* id_;
   bool enabled_;
+  Dart_NativeStreamConsumer callback_;
 };
 
 class Service : public AllStatic {
@@ -108,6 +114,9 @@
       Dart_ServiceStreamListenCallback listen_callback,
       Dart_ServiceStreamCancelCallback cancel_callback);
 
+  static void SetNativeServiceStreamCallback(Dart_NativeStreamConsumer consumer,
+                                             const char* stream_id);
+
   static void SetGetServiceAssetsCallback(
       Dart_GetVMServiceAssetsArchive get_service_assets);