Implement concat (dart-lang/stream_transform#6)
Closes dart-lang/stream_transform#5
concat implementation and tests. Relatively few tests since most
behavior is through StreamController.addStream and we have high
confidence in that method. Test the thing that is easy to get wrong in
this utility - stream closing.
Add missing README entry for already added debounce.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index cf303c7..1d323aa 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.0.2
+
+- Add `concat`: Appends streams in series
+
## 0.0.1
- Initial release with the following utilities:
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index ecab8a4..ac32362 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -5,3 +5,12 @@
Collects values from a source stream until a `trigger` stream fires and the
collected values are emitted.
+
+# concat
+
+Appends the values of a stream after another stream finishes.
+
+# debounce, debounceBuffer
+
+Prevents a source stream from emitting too frequently by dropping or collecting
+values that occur within a given duration.
diff --git a/pkgs/stream_transform/lib/src/concat.dart b/pkgs/stream_transform/lib/src/concat.dart
new file mode 100644
index 0000000..0a2c27a
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/concat.dart
@@ -0,0 +1,27 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Starts emitting values from [next] after the original stream is complete.
+///
+/// If the initial stream never finishes, the [next] stream will never be
+/// listened to.
+StreamTransformer<T, T> concat<T>(Stream<T> next) => new _Concat<T>(next);
+
+class _Concat<T> implements StreamTransformer<T, T> {
+ final Stream _next;
+
+ _Concat(this._next);
+
+ @override
+ Stream<T> bind(Stream<T> first) {
+ var controller = new StreamController<T>();
+ controller
+ .addStream(first)
+ .then((_) => controller.addStream(_next))
+ .then((_) => controller.close());
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 49a714e..c128e4b 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -4,3 +4,4 @@
export 'src/buffer.dart';
export 'src/debounce.dart';
+export 'src/concat.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 1cf4034..7352049 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
description: A collection of utilities to transform and manipulate streams.
author: Dart Team <misc@dartlang.org>
homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.1
+version: 0.0.2-dev
environment:
sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/concat_test.dart b/pkgs/stream_transform/test/concat_test.dart
new file mode 100644
index 0000000..6cf041b
--- /dev/null
+++ b/pkgs/stream_transform/test/concat_test.dart
@@ -0,0 +1,47 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ group('concat', () {
+ test('adds all values from both streams', () async {
+ var first = new Stream.fromIterable([1, 2, 3]);
+ var second = new Stream.fromIterable([4, 5, 6]);
+ var all = await first.transform(concat(second)).toList();
+ expect(all, [1, 2, 3, 4, 5, 6]);
+ });
+
+ test('closes first stream on cancel', () async {
+ var firstStreamClosed = false;
+ var first = new StreamController()
+ ..onCancel = () {
+ firstStreamClosed = true;
+ };
+ var second = new StreamController();
+ var subscription =
+ first.stream.transform(concat(second.stream)).listen((_) {});
+ await subscription.cancel();
+ expect(firstStreamClosed, true);
+ });
+
+ test('closes second stream on cancel if first stream done', () async {
+ var first = new StreamController();
+ var secondStreamClosed = false;
+ var second = new StreamController()
+ ..onCancel = () {
+ secondStreamClosed = true;
+ };
+ var subscription =
+ first.stream.transform(concat(second.stream)).listen((_) {});
+ await first.close();
+ await new Future(() {});
+ await subscription.cancel();
+ expect(secondStreamClosed, true);
+ });
+ });
+}