Move to a streamed model for files - mostly an experiment
diff --git a/lib/src/io.dart b/lib/src/io.dart
index 7f03abc..d87af51 100644
--- a/lib/src/io.dart
+++ b/lib/src/io.dart
@@ -106,23 +106,39 @@
 
 /// Formats all of the files and directories given by [paths].
 Future<void> formatPaths(FormatterOptions options, List<String> paths) async {
-  // If the user didn't specify a language version, then look for surrounding
-  // package configs so we know what language versions to use for the files.
-  var cache = ConfigCache();
+  if (!await _processFileStream(options, _expandPaths(paths, options))) {
+    exitCode = 65;
+  }
+}
 
+/// Expands [paths] into a stream of files to format.
+Stream<File> _expandPaths(List<String> paths, FormatterOptions options) async* {
+  var seen = <String>{};
   for (var path in paths) {
     var directory = Directory(path);
     if (directory.existsSync()) {
-      if (!await _processDirectory(cache, options, directory)) {
-        exitCode = 65;
+      await for (var entry in directory.list(
+        recursive: true,
+        followLinks: options.followLinks,
+      )) {
+        if (entry is Link) continue;
+        if (entry is! File || !entry.path.endsWith('.dart')) continue;
+
+        // Ignore paths in hidden directories.
+        var parts = p.split(p.relative(entry.path, from: directory.path));
+        if (parts.any((part) => part.startsWith('.'))) continue;
+
+        if (seen.add(p.canonicalize(entry.path))) {
+          yield entry;
+        }
       }
       continue;
     }
 
     var file = File(path);
     if (file.existsSync()) {
-      if (!await _processFile(cache, options, file)) {
-        exitCode = 65;
+      if (seen.add(p.canonicalize(file.path))) {
+        yield file;
       }
     } else {
       stderr.writeln('No file or directory found at "$path".');
@@ -130,52 +146,33 @@
   }
 }
 
-/// Runs the formatter on every .dart file in [path] (and its subdirectories),
-/// and replaces them with their formatted output.
-///
-/// Returns `true` if successful or `false` if an error occurred in any of the
-/// files.
-Future<bool> _processDirectory(
-  ConfigCache cache,
+/// Runs the formatter on a stream of files using a worker pool.
+Future<bool> _processFileStream(
   FormatterOptions options,
-  Directory directory,
+  Stream<File> files,
 ) async {
+  var cache = ConfigCache();
   var success = true;
-
-  var entries = directory.listSync(
-    recursive: true,
-    followLinks: options.followLinks,
-  );
-  entries.sort((a, b) => a.path.compareTo(b.path));
-
   var pool = WorkerPool();
 
-  for (var entry in entries) {
-    if (entry is Link) continue;
-
-    if (entry is! File || !entry.path.endsWith('.dart')) continue;
-
-    // If the path is in a subdirectory starting with ".", ignore it.
-    var parts = p.split(p.relative(entry.path, from: directory.path));
-    if (parts.any((part) => part.startsWith('.'))) continue;
-
-    var displayPath = p.normalize(entry.path);
+  await for (var file in files) {
+    var displayPath = p.normalize(file.path);
 
     // Determine configuration in main isolate (leveraging cache).
     var languageVersion =
         options.languageVersion ??
-        await cache.findLanguageVersion(entry, displayPath);
+        await cache.findLanguageVersion(file, displayPath);
     languageVersion ??= DartFormatter.latestLanguageVersion;
 
-    var pageWidth = options.pageWidth ?? await cache.findPageWidth(entry);
+    var pageWidth = options.pageWidth ?? await cache.findPageWidth(file);
     var trailingCommas =
-        options.trailingCommas ?? await cache.findTrailingCommas(entry);
+        options.trailingCommas ?? await cache.findTrailingCommas(file);
     pageWidth ??= DartFormatter.defaultPageWidth;
 
-    options.beforeFile(entry, displayPath);
+    options.beforeFile(file, displayPath);
 
     await pool.add(
-      uri: entry.path,
+      uri: file.path,
       languageVersion: languageVersion,
       indent: options.indent,
       pageWidth: pageWidth,
@@ -198,89 +195,18 @@
 
         var output = SourceCode(
           response.text!,
-          uri: entry.path,
+          uri: file.path,
           selectionStart: response.selectionStart,
           selectionLength: response.selectionLength,
         );
 
-        options.afterFile(
-          entry,
-          displayPath,
-          output,
-          changed: response.changed,
-        );
+        options.afterFile(file, displayPath, output, changed: response.changed);
       },
     );
   }
   var telemetry = await pool.close();
   options.summary.addTelemetry(telemetry);
+
+  await pool.close();
   return success;
 }
-
-/// Runs the formatter on [file].
-///
-/// Returns `true` if successful or `false` if an error occurred.
-Future<bool> _processFile(
-  ConfigCache cache,
-  FormatterOptions options,
-  File file, {
-  String? displayPath,
-}) async {
-  displayPath ??= file.path;
-
-  // Determine what language version to use.
-  var languageVersion =
-      options.languageVersion ??
-      await cache.findLanguageVersion(file, displayPath);
-
-  // If they didn't specify a version and we couldn't find a surrounding
-  // package, then default to the latest version.
-  languageVersion ??= DartFormatter.latestLanguageVersion;
-
-  // Determine the configuration options.
-  var pageWidth = options.pageWidth ?? await cache.findPageWidth(file);
-  var trailingCommas =
-      options.trailingCommas ?? await cache.findTrailingCommas(file);
-
-  // Use a default page width if we don't have a specified one and couldn't
-  // find a configured one.
-  pageWidth ??= DartFormatter.defaultPageWidth;
-
-  var formatter = DartFormatter(
-    languageVersion: languageVersion,
-    indent: options.indent,
-    pageWidth: pageWidth,
-    trailingCommas: trailingCommas,
-    experimentFlags: options.experimentFlags,
-  );
-
-  try {
-    var source = SourceCode(file.readAsStringSync(), uri: file.path);
-    options.beforeFile(file, displayPath);
-    var output = formatter.formatSource(source);
-    options.afterFile(
-      file,
-      displayPath,
-      output,
-      changed: source.text != output.text,
-    );
-    return true;
-  } on FormatterException catch (err) {
-    var color =
-        Platform.operatingSystem != 'windows' &&
-        stdioType(stderr) == StdioType.terminal;
-
-    stderr.writeln(err.message(color: color));
-  } on UnexpectedOutputException catch (err) {
-    stderr.writeln('''Hit a bug in the formatter when formatting $displayPath.
-$err
-Please report at github.com/dart-lang/dart_style/issues.''');
-  } catch (err, stack) {
-    stderr.writeln('''Hit a bug in the formatter when formatting $displayPath.
-Please report at github.com/dart-lang/dart_style/issues.
-$err
-$stack''');
-  }
-
-  return false;
-}
diff --git a/test/cli/cli_test.dart b/test/cli/cli_test.dart
index 51c323e..967ebce 100644
--- a/test/cli/cli_test.dart
+++ b/test/cli/cli_test.dart
@@ -23,17 +23,17 @@
       ]).create();
 
       var process = await runFormatterOnDir();
-      await expectLater(
-        process.stdout,
-        emitsInOrder([
+
+      var lines = await process.stdout.rest.toList();
+      expect(
+        lines,
+        containsAll([
           'Formatted ${p.join('code', 'a.dart')}',
           'Formatted ${p.join('code', 'c.dart')}',
         ]),
       );
-      await expectLater(
-        process.stdout,
-        emits(startsWith('Formatted 3 files (2 changed)')),
-      );
+      expect(lines, anyElement(startsWith('Formatted 3 files (2 changed)')));
+
       await process.shouldExit(0);
 
       // Overwrites the files.
@@ -52,17 +52,17 @@
         p.join('code', 'subdir'),
         p.join('code', 'c.dart'),
       ]);
-      await expectLater(
-        process.stdout,
-        emitsInOrder([
+
+      var lines = await process.stdout.rest.toList();
+      expect(
+        lines,
+        containsAll([
           'Formatted ${p.join('code', 'subdir', 'a.dart')}',
           'Formatted ${p.join('code', 'c.dart')}',
         ]),
       );
-      await expectLater(
-        process.stdout,
-        emits(startsWith('Formatted 2 files (2 changed)')),
-      );
+      expect(lines, anyElement(startsWith('Formatted 2 files (2 changed)')));
+
       await process.shouldExit(0);
 
       // Overwrites the selected files.
diff --git a/test/cli/output_test.dart b/test/cli/output_test.dart
index b34f0b7..b3b970a 100644
--- a/test/cli/output_test.dart
+++ b/test/cli/output_test.dart
@@ -22,18 +22,16 @@
       ]).create();
 
       var process = await runFormatterOnDir(['--show=all']);
-      await expectLater(
-        process.stdout,
-        emitsInOrder([
+      var lines = await process.stdout.rest.toList();
+      expect(
+        lines,
+        containsAll([
           'Formatted ${p.join('code', 'a.dart')}',
           'Unchanged ${p.join('code', 'b.dart')}',
           'Formatted ${p.join('code', 'c.dart')}',
         ]),
       );
-      await expectLater(
-        process.stdout,
-        emits(startsWith('Formatted 3 files (2 changed)')),
-      );
+      expect(lines, anyElement(startsWith('Formatted 3 files (2 changed)')));
       await process.shouldExit(0);
     });
 
@@ -60,17 +58,15 @@
       ]).create();
 
       var process = await runFormatterOnDir(['--show=changed']);
-      await expectLater(
-        process.stdout,
-        emitsInOrder([
+      var lines = await process.stdout.rest.toList();
+      expect(
+        lines,
+        containsAll([
           'Formatted ${p.join('code', 'a.dart')}',
           'Formatted ${p.join('code', 'c.dart')}',
         ]),
       );
-      await expectLater(
-        process.stdout,
-        emits(startsWith('Formatted 3 files (2 changed)')),
-      );
+      expect(lines, anyElement(startsWith('Formatted 3 files (2 changed)')));
       await process.shouldExit(0);
     });
   });
@@ -107,11 +103,19 @@
         var process = await runFormatterOnDir(['--output=show', '--show=all']);
         await expectLater(
           process.stdout,
-          emitsInOrder([
-            'Changed ${p.join('code', 'a.dart')}',
-            formattedOutput,
-            'Unchanged ${p.join('code', 'b.dart')}',
-            formattedOutput,
+          emitsAnyOf([
+            emitsInOrder([
+              'Changed ${p.join('code', 'a.dart')}',
+              formattedOutput,
+              'Unchanged ${p.join('code', 'b.dart')}',
+              formattedOutput,
+            ]),
+            emitsInOrder([
+              'Unchanged ${p.join('code', 'b.dart')}',
+              formattedOutput,
+              'Changed ${p.join('code', 'a.dart')}',
+              formattedOutput,
+            ]),
           ]),
         );
         await expectLater(
@@ -172,8 +176,8 @@
         });
 
         var process = await runFormatterOnDir(['--output=json']);
-
-        await expectLater(process.stdout, emitsInOrder([jsonA, jsonB]));
+        var lines = await process.stdout.rest.toList();
+        expect(lines, containsAll([jsonA, jsonB]));
         await process.shouldExit();
       });
 
@@ -194,17 +198,15 @@
         ]).create();
 
         var process = await runFormatterOnDir(['--output=none', '--show=all']);
-        await expectLater(
-          process.stdout,
-          emitsInOrder([
+        var lines = await process.stdout.rest.toList();
+        expect(
+          lines,
+          containsAll([
             'Changed ${p.join('code', 'a.dart')}',
             'Unchanged ${p.join('code', 'b.dart')}',
           ]),
         );
-        await expectLater(
-          process.stdout,
-          emits(startsWith('Formatted 2 files (1 changed)')),
-        );
+        expect(lines, anyElement(startsWith('Formatted 2 files (1 changed)')));
         await process.shouldExit(0);
 
         // Does not overwrite files.