Add StreamExtensions.firstOrNull (#195)

By analogy to IterableExtensions.firstOrNull in the collection package.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e32ec99..9acccf9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.9.0
+
+* Add `StreamExtensions.firstOrNull`.
+
 ## 2.8.2
 
 * Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart
index 3801a02..129ce26 100644
--- a/lib/src/stream_extensions.dart
+++ b/lib/src/stream_extensions.dart
@@ -33,4 +33,29 @@
       sink.close();
     }));
   }
+
+  /// A future which completes with the first event of this stream, or with
+  /// `null`.
+  ///
+  /// This stream is listened to, and if it emits any event, whether a data
+  /// event or an error event, the future completes with the same data value or
+  /// error. If the stream ends without emitting any events, the future is
+  /// completed with `null`.
+  Future<T?> get firstOrNull {
+    var completer = Completer<T?>.sync();
+    final subscription = listen(null);
+    subscription
+      ..onData((event) {
+        subscription.cancel();
+        completer.complete(event);
+      })
+      ..onError((Object error, StackTrace stackTrace) {
+        subscription.cancel();
+        completer.completeError(error, stackTrace);
+      })
+      ..onDone(() {
+        completer.complete(null);
+      });
+    return completer.future;
+  }
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index c636de6..ec3613a 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.8.3
+version: 2.9.0-dev
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
diff --git a/test/stream_extensions_test.dart b/test/stream_extensions_test.dart
index 85a3cee..2118ae7 100644
--- a/test/stream_extensions_test.dart
+++ b/test/stream_extensions_test.dart
@@ -55,4 +55,41 @@
       expect(() => Stream.fromIterable([1]).slices(0), throwsRangeError);
     });
   });
+
+  group('.firstOrNull', () {
+    test('returns the first data event', () {
+      expect(
+          Stream.fromIterable([1, 2, 3, 4]).firstOrNull, completion(equals(1)));
+    });
+
+    test('returns the first error event', () {
+      expect(Stream.error('oh no').firstOrNull, throwsA('oh no'));
+    });
+
+    test('returns null for an empty stream', () {
+      expect(Stream.empty().firstOrNull, completion(isNull));
+    });
+
+    test('cancels the subscription after an event', () async {
+      var isCancelled = false;
+      var controller = StreamController<int>(onCancel: () {
+        isCancelled = true;
+      });
+      controller.add(1);
+
+      await expectLater(controller.stream.firstOrNull, completion(equals(1)));
+      expect(isCancelled, isTrue);
+    });
+
+    test('cancels the subscription after an error', () async {
+      var isCancelled = false;
+      var controller = StreamController<int>(onCancel: () {
+        isCancelled = true;
+      });
+      controller.addError('oh no');
+
+      await expectLater(controller.stream.firstOrNull, throwsA('oh no'));
+      expect(isCancelled, isTrue);
+    });
+  });
 }