Add a LazyStream class.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1491923005 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4cdfc6a..7e7ed4a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,10 @@
## 1.4.0
+- Added `LazyStream`, which forwards to the return value of a callback that's
+ only called when the stream is listened to.
+
+## 1.4.0
+
- Added `AsyncMemoizer.future`, which allows the result to be accessed before
`runOnce()` is called.
diff --git a/lib/async.dart b/lib/async.dart
index 9da3552..1d127fd 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -14,6 +14,7 @@
export "src/delegate/stream_sink.dart";
export "src/delegate/stream_subscription.dart";
export "src/future_group.dart";
+export "src/lazy_stream.dart";
export "src/restartable_timer.dart";
export "src/result_future.dart";
export "src/stream_completer.dart";
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
new file mode 100644
index 0000000..1878d4c
--- /dev/null
+++ b/lib/src/lazy_stream.dart
@@ -0,0 +1,51 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.lazy_stream;
+
+import "dart:async";
+
+import "stream_completer.dart";
+
+/// A [Stream] wrapper that forwards to another [Stream] that's initialized
+/// lazily.
+///
+/// This class allows a concrete `Stream` to be created only once it has a
+/// listener. It's useful to wrapping APIs that do expensive computation to
+/// produce a `Stream`.
+class LazyStream<T> extends Stream<T> {
+ /// The callback that's called to create the inner stream.
+ ZoneCallback _callback;
+
+ /// Creates a single-subscription `Stream` that calls [callback] when it gets
+ /// a listener and forwards to the returned stream.
+ ///
+ /// The [callback] may return a `Stream` or a `Future<Stream>`.
+ LazyStream(callback()) : _callback = callback {
+ // Explicitly check for null because we null out [_callback] internally.
+ if (_callback == null) throw new ArgumentError.notNull('callback');
+ }
+
+ StreamSubscription<T> listen(void onData(T event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ if (_callback == null) {
+ throw new StateError("Stream has already been listened to.");
+ }
+
+ // Null out the callback before we invoke it to ensure that even while
+ // running it, this can't be called twice.
+ var callback = _callback;
+ _callback = null;
+ var result = callback();
+
+ Stream stream = result is Future
+ ? StreamCompleter.fromFuture(result)
+ : result;
+
+ return stream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+ }
+}
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index 6ec440f..ac401a7 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -5,7 +5,6 @@
library async.stream_splitter;
import 'dart:async';
-import 'dart:collection';
import '../result.dart';
import 'future_group.dart';
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index e9e7d77..bdf329b 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -10,7 +10,7 @@
/// A [Stream] adapter for a [StreamSubscription].
///
-/// This class allows as `StreamSubscription` to be treated as a `Stream`.
+/// This class allows a `StreamSubscription` to be treated as a `Stream`.
///
/// The subscription is paused until the stream is listened to,
/// then it is resumed and the events are passed on to the
diff --git a/pubspec.yaml b/pubspec.yaml
index 8c02a7a..e98c59e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 1.4.1-dev
+version: 1.5.0
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
diff --git a/test/lazy_stream_test.dart b/test/lazy_stream_test.dart
new file mode 100644
index 0000000..63f2a19
--- /dev/null
+++ b/test/lazy_stream_test.dart
@@ -0,0 +1,106 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+main() {
+ test("disallows a null callback", () {
+ expect(() => new LazyStream(null), throwsArgumentError);
+ });
+
+ test("calls the callback when the stream is listened", () async {
+ var callbackCalled = false;
+ var stream = new LazyStream(expectAsync(() {
+ callbackCalled = true;
+ return new Stream.empty();
+ }));
+
+ await flushMicrotasks();
+ expect(callbackCalled, isFalse);
+
+ stream.listen(null);
+ expect(callbackCalled, isTrue);
+ });
+
+ test("calls the callback when the stream is listened", () async {
+ var callbackCalled = false;
+ var stream = new LazyStream(expectAsync(() {
+ callbackCalled = true;
+ return new Stream.empty();
+ }));
+
+ await flushMicrotasks();
+ expect(callbackCalled, isFalse);
+
+ stream.listen(null);
+ expect(callbackCalled, isTrue);
+ });
+
+ test("forwards to a synchronously-provided stream", () async {
+ var controller = new StreamController<int>();
+ var stream = new LazyStream(expectAsync(() => controller.stream));
+
+ var events = [];
+ stream.listen(events.add);
+
+ controller.add(1);
+ await flushMicrotasks();
+ expect(events, equals([1]));
+
+ controller.add(2);
+ await flushMicrotasks();
+ expect(events, equals([1, 2]));
+
+ controller.add(3);
+ await flushMicrotasks();
+ expect(events, equals([1, 2, 3]));
+
+ controller.close();
+ });
+
+ test("forwards to an asynchronously-provided stream", () async {
+ var controller = new StreamController<int>();
+ var stream = new LazyStream(expectAsync(() async => controller.stream));
+
+ var events = [];
+ stream.listen(events.add);
+
+ controller.add(1);
+ await flushMicrotasks();
+ expect(events, equals([1]));
+
+ controller.add(2);
+ await flushMicrotasks();
+ expect(events, equals([1, 2]));
+
+ controller.add(3);
+ await flushMicrotasks();
+ expect(events, equals([1, 2, 3]));
+
+ controller.close();
+ });
+
+ test("a lazy stream can't be listened to multiple times", () {
+ var stream = new LazyStream(expectAsync(() => new Stream.empty()));
+ expect(stream.isBroadcast, isFalse);
+
+ stream.listen(null);
+ expect(() => stream.listen(null), throwsStateError);
+ expect(() => stream.listen(null), throwsStateError);
+ });
+
+ test("a lazy stream can't be listened to from within its callback", () {
+ var stream;
+ stream = new LazyStream(expectAsync(() {
+ expect(() => stream.listen(null), throwsStateError);
+ return new Stream.empty();
+ }));
+ stream.listen(null);
+ });
+}