Add `byteCollector` stream transformer and `collectBytes` function.
BUG= https://github.com/dart-lang/typed_data/issues/2
R=nweiz@google.com
Review-Url: https://codereview.chromium.org//2649233006 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c49a4bd..1ac341e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.13.0
+
+* Add a `collectBytes` function which collects list-of-byte events into
+ a single byte list.
+
## 1.12.0
* Add an `AsyncCache` class that caches asynchronous operations for a period of
diff --git a/lib/async.dart b/lib/async.dart
index c11f0b5..e090032 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -4,6 +4,7 @@
export "src/async_cache.dart";
export "src/async_memoizer.dart";
+export "src/byte_collector.dart";
export "src/cancelable_operation.dart";
export "src/delegate/event_sink.dart";
export "src/delegate/future.dart";
diff --git a/lib/src/byte_collector.dart b/lib/src/byte_collector.dart
new file mode 100644
index 0000000..3b4f075
--- /dev/null
+++ b/lib/src/byte_collector.dart
@@ -0,0 +1,42 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+import "dart:typed_data";
+
+/// Collects an asynchronous sequence of byte lists into a single list of bytes.
+///
+/// If the [source] stream emits an error event,
+/// the collection fails and the returned future completes with the same error.
+///
+/// If any of the input data are not valid bytes, they will be truncated to
+/// an eight-bit unsigned value in the resulting list.
+Future<Uint8List> collectBytes(Stream<List<int>> source) {
+ var byteLists = List<List<int>>[];
+ var length = 0;
+ var completer = new Completer<Uint8List>.sync();
+ source.listen(
+ (bytes) {
+ byteLists.add(bytes);
+ length += bytes.length;
+ },
+ onError: completer.completeError,
+ onDone: () {
+ completer.complete(_collect(length, byteLists));
+ },
+ cancelOnError: true);
+ return completer.future;
+}
+
+// Join a lists of bytes with a known total length into a single [Uint8List].
+Uint8List _collect(int length, List<List<int>> byteLists) {
+ var result = new Uint8List(length);
+ int i = 0;
+ for (var byteList in byteLists) {
+ var end = i + byteList.length;
+ result.setRange(i, end, byteList);
+ i = end;
+ }
+ return result;
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index ec62ff2..a3e0c90 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 1.12.0
+version: 1.13.0
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
diff --git a/test/byte_collection_test.dart b/test/byte_collection_test.dart
new file mode 100644
index 0000000..8068542
--- /dev/null
+++ b/test/byte_collection_test.dart
@@ -0,0 +1,39 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+import "dart:typed_data";
+
+import "package:test/test.dart";
+import "package:async/async.dart" show byteCollector, collectBytes, Result;
+
+void main() {
+ group("collectBytes", () {
+ test("simple list and overflow", () {
+ var result = collectBytes(new Stream.fromIterable([
+ [0],
+ [1],
+ [2],
+ [256]
+ ]));
+ expect(result, completion([0, 1, 2, 0]));
+ });
+
+ test("no events", () {
+ var result = collectBytes(new Stream.fromIterable([]));
+ expect(result, completion([]));
+ });
+
+ test("empty events", () {
+ var result = collectBytes(new Stream.fromIterable([[], []]));
+ expect(result, completion([]));
+ });
+
+ test("error event", () {
+ var result = collectBytes(new Stream.fromIterable(
+ new Iterable.generate(3, (n) => n == 2 ? throw "badness" : [n])));
+ expect(result, throwsA("badness"));
+ });
+ });
+}