[current results ui] Fetch multiple pages of results from server

Change-Id: I08575581ef108ef781f2480399e4648285629273
Reviewed-on: https://dart-review.googlesource.com/c/dart_ci/+/161380
Reviewed-by: Karl Klose <karlklose@google.com>
diff --git a/current_results_ui/lib/query.dart b/current_results_ui/lib/query.dart
index 9270fc9..2fbd0ec 100644
--- a/current_results_ui/lib/query.dart
+++ b/current_results_ui/lib/query.dart
@@ -2,6 +2,8 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
+
 import 'package:flutter/material.dart';
 import 'package:http/http.dart' as http;
 import 'dart:convert';
@@ -18,40 +20,68 @@
 
 class QueryResults extends ChangeNotifier {
   Filter filter;
-  bool showAll = true;
+  StreamSubscription<GetResultsResponse> fetcher;
   List<String> names = [];
   Map<String, Map<String, int>> counts = {};
   Map<String, Map<ChangeInResult, List<Result>>> grouped = {};
+  int fetchedResultsCount = 0;
   bool partialResults = true;
 
   GetResultsResponse resultsObject = GetResultsResponse.create();
 
   void fetchCurrentResults() async {
-    final client = http.Client();
-    final resultsQuery = Uri.https(apiHost, 'v1/results',
-        {'filter': filter.terms.join(','), 'pageSize': '$fetchLimit'});
-    final resultsResponse = await client.get(resultsQuery);
-    resultsObject = GetResultsResponse.create()
-      ..mergeFromProto3Json(json.decode(resultsResponse.body));
-    final results = resultsObject.results;
+    fetcher?.cancel();
+    names = [];
+    counts = {};
+    grouped = {};
+    fetchedResultsCount = 0;
+    partialResults = true;
 
+    fetcher = fetchResults(filter).listen(onResults, onDone: onDone);
+  }
+
+  void onResults(GetResultsResponse response) {
+    final results = response.results;
+    fetchedResultsCount += results.length;
+    if (fetchedResultsCount >= maxFetchedResults) {
+      fetcher.cancel();
+      fetcher = null;
+    }
     for (final result in results) {
+      final change = ChangeInResult(result);
       grouped
           .putIfAbsent(result.name, () => <ChangeInResult, List<Result>>{})
-          .putIfAbsent(ChangeInResult(result), () => <Result>[])
+          .putIfAbsent(change, () => <Result>[])
           .add(result);
-    }
-    for (final name in grouped.keys) {
-      final count = counts[name] = <String, int>{};
-      for (final change in grouped[name].keys) {
-        count.putIfAbsent(change.kind, () => 0);
-        count[change.kind] += grouped[name][change].length;
-      }
+      counts
+          .putIfAbsent(result.name, () => <String, int>{})
+          .putIfAbsent(change.kind, () => 0);
+      ++counts[result.name][change.kind];
     }
     names = grouped.keys.toList()..sort();
-    partialResults = results.length == fetchLimit;
     notifyListeners();
   }
+
+  void onDone() {
+    partialResults = false;
+  }
+}
+
+Stream<GetResultsResponse> fetchResults(Filter filter) async* {
+  final client = http.Client();
+  var pageToken = '';
+  do {
+    final resultsQuery = Uri.https(apiHost, 'v1/results', {
+      'filter': filter.terms.join(','),
+      'pageSize': '$fetchLimit',
+      'pageToken': pageToken
+    });
+    final response = await client.get(resultsQuery);
+    final results = GetResultsResponse.create()
+      ..mergeFromProto3Json(json.decode(response.body));
+    yield results;
+    pageToken = results.nextPageToken;
+  } while (pageToken.isNotEmpty);
 }
 
 class ChangeInResult {