[cloud functions] Store multiple changes in parallel

* Run 30 inserts in parallel using pkg:pool.
* Add missing awaits in main.

Bug: https://github.com/dart-lang/dart_ci/issues/69

Change-Id: Iedefcb85d4d62f390a47f80ddd272f4263456b4d
Reviewed-on: https://dart-review.googlesource.com/c/dart_ci/+/134300
Reviewed-by: Jonas Termansen <sortie@google.com>
diff --git a/functions/node/builder.dart b/functions/node/builder.dart
index a052517..9119cf1 100644
--- a/functions/node/builder.dart
+++ b/functions/node/builder.dart
@@ -5,6 +5,7 @@
 import 'dart:convert';
 
 import 'package:http/http.dart' as http;
+import 'package:pool/pool.dart';
 
 import 'firestore.dart';
 
@@ -74,7 +75,7 @@
     final configurations =
         results.map((change) => change['configuration'] as String).toSet();
     await update(configurations);
-    await Future.forEach(results.where(isChangedResult), storeChange);
+    await Pool(30).forEach(results.where(isChangedResult), storeChange).drain();
     if (countChunks != null) {
       await firestore.storeBuildChunkCount(builderName, endIndex, countChunks);
     }
diff --git a/functions/node/index.dart b/functions/node/index.dart
index 601002a..8c9e071 100644
--- a/functions/node/index.dart
+++ b/functions/node/index.dart
@@ -23,13 +23,14 @@
       : null;
   final String buildbucketID = message.attributes['buildbucket_id'];
   try {
+    var firestore = FirestoreServiceImpl();
     if (commit.startsWith('refs/changes')) {
-      return Tryjob(commit, countChunks, buildbucketID, FirestoreServiceImpl(),
-              http.NodeClient())
+      return await Tryjob(
+              commit, countChunks, buildbucketID, firestore, http.NodeClient())
           .process(results);
     } else {
-      return Build(commit, first, countChunks, FirestoreServiceImpl(),
-              http.NodeClient())
+      return await Build(
+              commit, first, countChunks, firestore, http.NodeClient())
           .process(results);
     }
   } catch (e) {
diff --git a/functions/node/tryjob.dart b/functions/node/tryjob.dart
index 2e20b68..7e94e9f 100644
--- a/functions/node/tryjob.dart
+++ b/functions/node/tryjob.dart
@@ -3,6 +3,7 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'package:http/http.dart' as http show BaseClient;
+import 'package:pool/pool.dart';
 
 import 'firestore.dart';
 import 'gerrit_change.dart';
@@ -37,7 +38,7 @@
     await update();
     builderName = results.first['builder_name'];
     buildNumber = int.parse(results.first['build_number']);
-    await Future.forEach(results.where(isChangedResult), storeTryChange);
+    await Pool(30).forEach(results.where(isChangedResult), storeChange).drain();
 
     if (countChunks != null) {
       await firestore.storeTryBuildChunkCount(builderName, buildNumber,
@@ -47,7 +48,7 @@
         builderName, buildNumber, buildbucketID, review, patchset, success);
   }
 
-  Future<void> storeTryChange(change) async {
+  Future<void> storeChange(change) async {
     final approved = await firestore.storeTryChange(change, review, patchset);
     if (!approved && !change['matches']) success = false;
   }
diff --git a/functions/pubspec.lock b/functions/pubspec.lock
index 5cb1a22..1fdeda1 100644
--- a/functions/pubspec.lock
+++ b/functions/pubspec.lock
@@ -366,7 +366,7 @@
     source: hosted
     version: "1.8.0+1"
   pool:
-    dependency: transitive
+    dependency: "direct main"
     description:
       name: pool
       url: "https://pub.dartlang.org"
diff --git a/functions/pubspec.yaml b/functions/pubspec.yaml
index defd5b8..377c0d2 100644
--- a/functions/pubspec.yaml
+++ b/functions/pubspec.yaml
@@ -9,6 +9,7 @@
 
 dependencies:
   firebase_functions_interop: ^1.0.2
+  pool: ^1.4.0
 
 dev_dependencies:
   build_runner: ^1.7.1