Add a LazyStream class.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1491923005 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4cdfc6a..7e7ed4a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,10 @@
 ## 1.4.0
 
+- Added `LazyStream`, which forwards to the return value of a callback that's
+  only called when the stream is listened to.
+
+## 1.4.0
+
 - Added `AsyncMemoizer.future`, which allows the result to be accessed before
   `runOnce()` is called.
 
diff --git a/lib/async.dart b/lib/async.dart
index 9da3552..1d127fd 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -14,6 +14,7 @@
 export "src/delegate/stream_sink.dart";
 export "src/delegate/stream_subscription.dart";
 export "src/future_group.dart";
+export "src/lazy_stream.dart";
 export "src/restartable_timer.dart";
 export "src/result_future.dart";
 export "src/stream_completer.dart";
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
new file mode 100644
index 0000000..1878d4c
--- /dev/null
+++ b/lib/src/lazy_stream.dart
@@ -0,0 +1,51 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.lazy_stream;
+
+import "dart:async";
+
+import "stream_completer.dart";
+
+/// A [Stream] wrapper that forwards to another [Stream] that's initialized
+/// lazily.
+///
+/// This class allows a concrete `Stream` to be created only once it has a
+/// listener. It's useful to wrapping APIs that do expensive computation to
+/// produce a `Stream`.
+class LazyStream<T> extends Stream<T> {
+  /// The callback that's called to create the inner stream.
+  ZoneCallback _callback;
+
+  /// Creates a single-subscription `Stream` that calls [callback] when it gets
+  /// a listener and forwards to the returned stream.
+  ///
+  /// The [callback] may return a `Stream` or a `Future<Stream>`.
+  LazyStream(callback()) : _callback = callback {
+    // Explicitly check for null because we null out [_callback] internally.
+    if (_callback == null) throw new ArgumentError.notNull('callback');
+  }
+
+  StreamSubscription<T> listen(void onData(T event),
+                               {Function onError,
+                                void onDone(),
+                                bool cancelOnError}) {
+    if (_callback == null) {
+      throw new StateError("Stream has already been listened to.");
+    }
+
+    // Null out the callback before we invoke it to ensure that even while
+    // running it, this can't be called twice.
+    var callback = _callback;
+    _callback = null;
+    var result = callback();
+
+    Stream stream = result is Future
+        ? StreamCompleter.fromFuture(result)
+        : result;
+
+    return stream.listen(onData,
+        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+  }
+}
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index 6ec440f..ac401a7 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -5,7 +5,6 @@
 library async.stream_splitter;
 
 import 'dart:async';
-import 'dart:collection';
 
 import '../result.dart';
 import 'future_group.dart';
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index e9e7d77..bdf329b 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -10,7 +10,7 @@
 
 /// A [Stream] adapter for a [StreamSubscription].
 ///
-/// This class allows as `StreamSubscription` to be treated as a `Stream`.
+/// This class allows a `StreamSubscription` to be treated as a `Stream`.
 ///
 /// The subscription is paused until the stream is listened to,
 /// then it is resumed and the events are passed on to the
diff --git a/pubspec.yaml b/pubspec.yaml
index 8c02a7a..e98c59e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.4.1-dev
+version: 1.5.0
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/lazy_stream_test.dart b/test/lazy_stream_test.dart
new file mode 100644
index 0000000..63f2a19
--- /dev/null
+++ b/test/lazy_stream_test.dart
@@ -0,0 +1,106 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+main() {
+  test("disallows a null callback", () {
+    expect(() => new LazyStream(null), throwsArgumentError);
+  });
+
+  test("calls the callback when the stream is listened", () async {
+    var callbackCalled = false;
+    var stream = new LazyStream(expectAsync(() {
+      callbackCalled = true;
+      return new Stream.empty();
+    }));
+
+    await flushMicrotasks();
+    expect(callbackCalled, isFalse);
+
+    stream.listen(null);
+    expect(callbackCalled, isTrue);
+  });
+
+  test("calls the callback when the stream is listened", () async {
+    var callbackCalled = false;
+    var stream = new LazyStream(expectAsync(() {
+      callbackCalled = true;
+      return new Stream.empty();
+    }));
+
+    await flushMicrotasks();
+    expect(callbackCalled, isFalse);
+
+    stream.listen(null);
+    expect(callbackCalled, isTrue);
+  });
+
+  test("forwards to a synchronously-provided stream", () async {
+    var controller = new StreamController<int>();
+    var stream = new LazyStream(expectAsync(() => controller.stream));
+
+    var events = [];
+    stream.listen(events.add);
+
+    controller.add(1);
+    await flushMicrotasks();
+    expect(events, equals([1]));
+
+    controller.add(2);
+    await flushMicrotasks();
+    expect(events, equals([1, 2]));
+
+    controller.add(3);
+    await flushMicrotasks();
+    expect(events, equals([1, 2, 3]));
+
+    controller.close();
+  });
+
+  test("forwards to an asynchronously-provided stream", () async {
+    var controller = new StreamController<int>();
+    var stream = new LazyStream(expectAsync(() async => controller.stream));
+
+    var events = [];
+    stream.listen(events.add);
+
+    controller.add(1);
+    await flushMicrotasks();
+    expect(events, equals([1]));
+
+    controller.add(2);
+    await flushMicrotasks();
+    expect(events, equals([1, 2]));
+
+    controller.add(3);
+    await flushMicrotasks();
+    expect(events, equals([1, 2, 3]));
+
+    controller.close();
+  });
+
+  test("a lazy stream can't be listened to multiple times", () {
+    var stream = new LazyStream(expectAsync(() => new Stream.empty()));
+    expect(stream.isBroadcast, isFalse);
+
+    stream.listen(null);
+    expect(() => stream.listen(null), throwsStateError);
+    expect(() => stream.listen(null), throwsStateError);
+  });
+
+  test("a lazy stream can't be listened to from within its callback", () {
+    var stream;
+    stream = new LazyStream(expectAsync(() {
+      expect(() => stream.listen(null), throwsStateError);
+      return new Stream.empty();
+    }));
+    stream.listen(null);
+  });
+}