[cloud functions] Add build status computation to result processing.

Change-Id: I9efde8ddc220daa1e7b6c3051622c615e6201b86
Reviewed-on: https://dart-review.googlesource.com/c/dart_ci/+/127360
Reviewed-by: Alexander Thomas <athom@google.com>
diff --git a/functions/node/builder.dart b/functions/node/builder.dart
index 03ca510..58bafef 100644
--- a/functions/node/builder.dart
+++ b/functions/node/builder.dart
@@ -53,15 +53,18 @@
   final http.BaseClient httpClient;
   final String commitHash;
   final Map<String, dynamic> firstResult;
+  final int countChunks;
   String builderName;
   int buildNumber;
   int startIndex;
   int endIndex;
   Set<String> tryApprovals;
+  bool success = true; // Changed to false if any unapproved failure is seen.
+  int countChanges = 0;
+  int commitsFetched;
 
-  Statistics stats = Statistics();
-
-  Build(this.commitHash, this.firstResult, this.firestore, this.httpClient)
+  Build(this.commitHash, this.firstResult, this.countChunks, this.firestore,
+      this.httpClient)
       : builderName = firstResult['builder_name'],
         buildNumber = int.parse(firstResult['build_number']);
 
@@ -71,7 +74,18 @@
     await update(configurations);
     await Future.forEach(
         results.where(isChangedResult), (result) => storeChange(result));
-    stats.report();
+    if (countChunks != null) {
+      await firestore.storeBuildChunkCount(builderName, endIndex, countChunks);
+    }
+    await firestore.storeChunkStatus(builderName, endIndex, success);
+
+    final report = [
+      "Processed ${results.length} results from $builderName build $buildNumber",
+      if (countChanges > 0) "Stored $countChanges changes",
+      if (commitsFetched != null) "Fetched $commitsFetched new commits",
+      if (!success) "Found unapproved new failures"
+    ];
+    print(report.join('\n'));
   }
 
   Future<void> update(Iterable<String> configurations) async {
@@ -131,7 +145,7 @@
       info('Found no new commits between $lastHash and master');
       return;
     }
-    stats.commitsFetched = commits.length;
+    commitsFetched = commits.length;
     final first = commits.last as Map<String, dynamic>;
     if (first['parents'].first != lastHash) {
       error('First new commit ${first['parents'].first} is not'
@@ -178,9 +192,18 @@
   }
 
   Future<void> storeChange(Map<String, dynamic> change) async {
-    firestore.storeChange(change, startIndex, endIndex,
-        approved: tryApprovals?.contains(testResult(change)) ?? false);
-    stats.changes++;
+    countChanges++;
+    var approved;
+    String result = await firestore.findResult(change, startIndex, endIndex);
+    if (result == null) {
+      approved = tryApprovals?.contains(testResult(change)) ?? false;
+      await firestore.storeResult(change, startIndex, endIndex,
+          approved: approved);
+    } else {
+      approved = await firestore.updateResult(
+          result, change['configuration'], startIndex, endIndex);
+    }
+    if (!approved && !change['matches']) success = false;
   }
 }
 
diff --git a/functions/node/firestore.dart b/functions/node/firestore.dart
index e71e282..a96e005 100644
--- a/functions/node/firestore.dart
+++ b/functions/node/firestore.dart
@@ -19,9 +19,15 @@
 
   Future<void> updateBuildInfo(String builder, int buildNumber, int index);
 
-  Future<void> storeChange(
+  Future<String> findResult(
+      Map<String, dynamic> change, int startIndex, int endIndex);
+
+  Future<void> storeResult(
       Map<String, dynamic> change, int startIndex, int endIndex,
-      {bool approved});
+      {bool approved = false});
+
+  Future<bool> updateResult(
+      String result, String configuration, int startIndex, int endIndex);
 
   Future<void> storeTryChange(
       Map<String, dynamic> change, int review, int patchset);
@@ -37,4 +43,8 @@
   Future<void> linkCommentsToCommit(int review, int index);
 
   Future<List<Map<String, dynamic>>> tryApprovals(int review);
+
+  Future<void> storeChunkStatus(String builder, int index, bool success);
+
+  Future<void> storeBuildChunkCount(String builder, int index, int numChunks);
 }
diff --git a/functions/node/firestore_impl.dart b/functions/node/firestore_impl.dart
index 3e78b29..217b55d 100644
--- a/functions/node/firestore_impl.dart
+++ b/functions/node/firestore_impl.dart
@@ -3,7 +3,7 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:math' show max, min;
-import 'package:firebase_functions_interop/firebase_functions_interop.dart';
+import 'package:firebase_admin_interop/firebase_admin_interop.dart';
 
 import 'firestore.dart';
 
@@ -85,9 +85,8 @@
     }
   }
 
-  Future<void> storeChange(
-      Map<String, dynamic> change, int startIndex, int endIndex,
-      {bool approved = false}) async {
+  Future<String> findResult(
+      Map<String, dynamic> change, int startIndex, int endIndex) async {
     String name = change['name'];
     String result = change['result'];
     String previousResult = change['previous_result'] ?? 'new test';
@@ -101,8 +100,6 @@
         .limit(5) // We will pick the right one, probably the latest.
         .get();
 
-    // Find an existing change group with a blamelist that intersects this change.
-
     bool blamelistIncludesChange(DocumentSnapshot groupDocument) {
       var group = groupDocument.data;
       var groupStart = group.getInt('blamelist_start_index');
@@ -110,38 +107,47 @@
       return startIndex <= groupEnd && endIndex >= groupStart;
     }
 
-    DocumentSnapshot group = snapshot.documents
-        .firstWhere(blamelistIncludesChange, orElse: () => null);
+    return snapshot.documents
+        .firstWhere(blamelistIncludesChange, orElse: () => null)
+        ?.documentID;
+  }
 
-    if (group == null) {
-      return firestore.collection('results').add(DocumentData.fromMap({
-            'name': name,
-            'result': result,
-            'previous_result': previousResult,
+  Future<void> storeResult(
+          Map<String, dynamic> change, int startIndex, int endIndex,
+          {bool approved = false}) =>
+      firestore.collection('results').add(DocumentData.fromMap({
+            'name': change['name'],
+            'result': change['result'],
+            'previous_result': change['previous_result'] ?? 'new test',
             'expected': change['expected'],
             'blamelist_start_index': startIndex,
             'blamelist_end_index': endIndex,
             'configurations': <String>[change['configuration']],
             if (approved) 'approved': true
           }));
-    }
+
+  Future<bool> updateResult(
+      String result, String configuration, int startIndex, int endIndex) {
+    DocumentReference reference = firestore.document('results/$result');
 
     // Update the change group in a transaction.
-    // Add new configuration and narrow the blamelist.
-    Future<void> updateGroup(Transaction transaction) async {
-      final DocumentSnapshot groupSnapshot =
-          await transaction.get(group.reference);
-      final data = groupSnapshot.data;
-      final newStart = max(startIndex, data.getInt('blamelist_start_index'));
-      final newEnd = min(endIndex, data.getInt('blamelist_end_index'));
-      final update = UpdateData.fromMap({
-        'blamelist_start_index': newStart,
-        'blamelist_end_index': newEnd,
-      });
-      update.setFieldValue('configurations',
-          Firestore.fieldValues.arrayUnion([change['configuration']]));
-      group.reference.updateData(update);
-    }
+    Future<bool> updateGroup(Transaction transaction) =>
+        transaction.get(reference).then((resultSnapshot) {
+          final data = resultSnapshot.data;
+          bool approved = data.getBool('approved') ?? false;
+          // Add the new configuration and narrow the blamelist.
+          final newStart =
+              max(startIndex, data.getInt('blamelist_start_index'));
+          final newEnd = min(endIndex, data.getInt('blamelist_end_index'));
+          final update = UpdateData.fromMap({
+            'blamelist_start_index': newStart,
+            'blamelist_end_index': newEnd,
+          });
+          update.setFieldValue('configurations',
+              Firestore.fieldValues.arrayUnion([configuration]));
+          reference.updateData(update);
+          return approved;
+        });
 
     return firestore.runTransaction(updateGroup);
   }
@@ -151,7 +157,6 @@
     String name = change['name'];
     String result = change['result'];
     String expected = change['expected'];
-    String reviewPath = change['commit_hash'];
     String previousResult = change['previous_result'] ?? 'new test';
     QuerySnapshot snapshot = await firestore
         .collection('try_results')
@@ -226,4 +231,32 @@
         .get();
     return [for (final document in approvals.documents) document.data.toMap()];
   }
+
+  Future<void> storeChunkStatus(String builder, int index, bool success) async {
+    final document = firestore.document('builds/$builder:$index');
+
+    Future<void> updateStatus(Transaction transaction) async {
+      final snapshot = await transaction.get(document);
+      final data = snapshot.data.toMap();
+      final int chunks = data['num_chunks'];
+      final int processedChunks = data['processed_chunks'] ?? 0;
+      final bool completed = chunks == processedChunks + 1;
+
+      final update = UpdateData.fromMap({
+        'processed_chunks': processedChunks + 1,
+        'success': (data['success'] ?? true) && success,
+        if (completed) 'completed': true
+      });
+      transaction.update(document, update);
+    }
+
+    return firestore.runTransaction(updateStatus);
+  }
+
+  Future<void> storeBuildChunkCount(
+      String builder, int index, int numChunks) async {
+    return firestore
+        .document('builds/$builder:$index')
+        .updateData(UpdateData.fromMap({'num_chunks': numChunks}));
+  }
 }
diff --git a/functions/node/index.dart b/functions/node/index.dart
index 7a016c8..261e81b 100644
--- a/functions/node/index.dart
+++ b/functions/node/index.dart
@@ -18,12 +18,17 @@
   final results = (message.json as List).cast<Map<String, dynamic>>();
   final first = results.first;
   final String commit = first['commit_hash'];
+  final int countChunks = message.attributes.containsKey('num_chunks')
+      ? int.parse(message.attributes['num_chunks'])
+      : null;
   try {
     if (commit.startsWith('refs/changes')) {
-      return Tryjob(commit, FirestoreServiceImpl(), http.NodeClient())
+      return Tryjob(
+              commit, countChunks, FirestoreServiceImpl(), http.NodeClient())
           .process(results);
     } else {
-      return Build(commit, first, FirestoreServiceImpl(), http.NodeClient())
+      return Build(commit, first, countChunks, FirestoreServiceImpl(),
+              http.NodeClient())
           .process(results);
     }
   } catch (e) {
diff --git a/functions/node/test/firestore_test_nodejs.dart b/functions/node/test/firestore_test_nodejs.dart
new file mode 100644
index 0000000..375612c
--- /dev/null
+++ b/functions/node/test/firestore_test_nodejs.dart
@@ -0,0 +1,50 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:firebase_admin_interop/firebase_admin_interop.dart';
+import 'package:test/test.dart';
+
+import '../firestore.dart';
+import '../firestore_impl.dart' as fs;
+
+// These tests read and write data from the Firestore database, and
+// should only be run locally against the dart-ci-staging project.
+// Requires the environment variable GOOGLE_APPLICATION_CREDENTIALS
+// to point to a json key to a service account
+// with write access to dart_ci_staging datastore.
+// Set the database with 'firebase use --add dart-ci-staging'
+// The test must be compiled with nodejs, and run using the 'node' command.
+
+void main() {
+  final firestore = fs.FirestoreServiceImpl();
+
+  test('Test chunk storing', () async {
+    final builder = 'test_builder';
+    final index = 123;
+    final document = fs.firestore.document('builds/$builder:$index');
+
+    await document.delete();
+    await firestore.updateBuildInfo(builder, 3456, index);
+    await firestore.storeChunkStatus(builder, index, true);
+    await firestore.storeBuildChunkCount(builder, index, 4);
+    await firestore.storeChunkStatus(builder, index, true);
+
+    DocumentSnapshot snapshot = await document.get();
+    var data = snapshot.data.toMap();
+    expect(data['success'], isTrue);
+    expect(data['num_chunks'], 4);
+    expect(data['processed_chunks'], 2);
+    expect(data['completed'], isNull);
+
+    await firestore.storeChunkStatus(builder, index, false);
+    await firestore.storeChunkStatus(builder, index, true);
+
+    snapshot = await document.get();
+    data = snapshot.data.toMap();
+    expect(data['success'], isFalse);
+    expect(data['num_chunks'], 4);
+    expect(data['processed_chunks'], 4);
+    expect(data['completed'], isTrue);
+  });
+}
diff --git a/functions/node/test/test.dart b/functions/node/test/test.dart
index 3873536..bf6b27c 100644
--- a/functions/node/test/test.dart
+++ b/functions/node/test/test.dart
@@ -28,8 +28,8 @@
     when(firestore.getCommit(previousCommitHash))
         .thenAnswer((_) => Future.value(previousCommit));
 
-    final builder =
-        Build(existingCommitHash, existingCommitChange, firestore, client);
+    final builder = Build(
+        existingCommitHash, existingCommitChange, null, firestore, client);
     await builder.storeBuildCommitsInfo();
     expect(builder.endIndex, existingCommit['index']);
     expect(builder.startIndex, previousCommit['index'] + 1);
@@ -62,7 +62,7 @@
         .thenAnswer((_) => Future(() => ResponseFake(gitilesLog)));
 
     final builder =
-        Build(landedCommitHash, landedCommitChange, firestore, client);
+        Build(landedCommitHash, landedCommitChange, null, firestore, client);
     await builder.storeBuildCommitsInfo();
     expect(builder.endIndex, landedCommit['index']);
     expect(builder.startIndex, existingCommit['index'] + 1);
@@ -106,14 +106,15 @@
             .toList()));
 
     final builder =
-        Build(landedCommitHash, landedCommitChange, firestore, client);
+        Build(landedCommitHash, landedCommitChange, null, firestore, client);
     await builder.process([landedCommitChange]);
 
     verifyZeroInteractions(client);
     verifyInOrder([
       verify(firestore.updateConfiguration(
           "dart2js-new-rti-linux-x64-d8", "dart2js-rti-linux-x64-d8")),
-      verify(firestore.storeChange(any, 53, 54, approved: true))
+      verify(firestore.findResult(any, 53, 54)),
+      verify(firestore.storeResult(any, 53, 54, approved: true))
     ]);
   });
 }
diff --git a/functions/node/tryjob.dart b/functions/node/tryjob.dart
index f72504e..999a522 100644
--- a/functions/node/tryjob.dart
+++ b/functions/node/tryjob.dart
@@ -12,12 +12,13 @@
 
 class Tryjob {
   static final changeRefRegExp = RegExp(r'refs/changes/(\d*)/(\d*)');
-  http.BaseClient httpClient;
-  FirestoreService firestore;
+  final http.BaseClient httpClient;
+  final FirestoreService firestore;
   int review;
   int patchset;
+  final int countChunks;
 
-  Tryjob(String changeRef, this.firestore, this.httpClient) {
+  Tryjob(String changeRef, this.countChunks, this.firestore, this.httpClient) {
     final match = changeRefRegExp.matchAsPrefix(changeRef);
     review = int.parse(match[1]);
     patchset = int.parse(match[2]);