Fix most strong mode warnings.
The remaining warnings will need type-asserting wrappers to fix, which
are coming in a separate CL.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1841223002 .
diff --git a/.analysis_options b/.analysis_options
new file mode 100644
index 0000000..a10d4c5
--- /dev/null
+++ b/.analysis_options
@@ -0,0 +1,2 @@
+analyzer:
+ strong-mode: true
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 946d3a8..803b589 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,21 @@
+## 1.9.1
+
+* Fix all strong mode warnings and add generic method annotations.
+
+* `new StreamQueue()` now takes a `Stream<T>` rather than a `Stream<dynamic>`.
+ Passing a type that wasn't `is`-compatible with `Stream<T>` would already
+ throw an error under some circumstances, so this is not considered a breaking
+ change.
+
+* `new SubscriptionStream()` now takes a `Stream<T>` rather than a
+ `Stream<dynamic>`. Passing a type that wasn't `is`-compatible with `Stream<T>`
+ would already throw an error under some circumstances, so this is not
+ considered a breaking change.
+
+* `ErrorResult` now takes a type parameter.
+
+* `Result.asError` now returns a `Result<T>`.
+
## 1.9.0
* Deprecate top-level libraries other than `package:async/async.dart`, which
diff --git a/lib/src/async_memoizer.dart b/lib/src/async_memoizer.dart
index 3612a7d..0213f68 100644
--- a/lib/src/async_memoizer.dart
+++ b/lib/src/async_memoizer.dart
@@ -31,7 +31,7 @@
///
/// This can be accessed at any time, and will fire once [runOnce] is called.
Future<T> get future => _completer.future;
- final _completer = new Completer();
+ final _completer = new Completer<T>();
/// Whether [runOnce] has been called yet.
bool get hasRun => _completer.isCompleted;
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 1ba6b3c..d0a1871 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -62,9 +62,9 @@
/// If this operation is cancelled, the returned future waits for the future
/// returned by [cancel], then completes to [cancellationValue].
Future valueOrCancellation([T cancellationValue]) {
- var completer = new Completer.sync();
-
- value.then(completer.complete, onError: completer.completeError);
+ var completer = new Completer<T>.sync();
+ value.then((result) => completer.complete(result),
+ onError: completer.completeError);
_completer._cancelMemo.future.then((_) {
completer.complete(cancellationValue);
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index 5e84e4f..bfc075b 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -7,20 +7,20 @@
/// A wrapper that forwards calls to a [Future].
class DelegatingFuture<T> implements Future<T> {
/// The wrapped [Future].
- final Future _future;
+ final Future<T> _future;
DelegatingFuture(this._future);
Stream<T> asStream() => _future.asStream();
- Future catchError(Function onError, {bool test(error)}) =>
+ Future catchError(Function onError, {bool test(Object error)}) =>
_future.catchError(onError, test: test);
- Future then(onValue(T value), {Function onError}) =>
+ Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
_future.then(onValue, onError: onError);
Future<T> whenComplete(action()) => _future.whenComplete(action);
- Future timeout(Duration timeLimit, {void onTimeout()}) =>
+ Future<T> timeout(Duration timeLimit, {void onTimeout()}) =>
_future.timeout(timeLimit, onTimeout: onTimeout);
}
diff --git a/lib/src/result.dart b/lib/src/result.dart
index 482cf9b..393dced 100644
--- a/lib/src/result.dart
+++ b/lib/src/result.dart
@@ -72,20 +72,14 @@
factory Result.error(Object error, [StackTrace stackTrace]) =>
new ErrorResult(error, stackTrace);
- // Helper functions.
- static _captureValue(value) => new ValueResult(value);
- static _captureError(error, stack) => new ErrorResult(error, stack);
- static _release(Result v) {
- if (v.isValue) return v.asValue.value; // Avoid wrapping in future.
- return v.asFuture;
- }
-
/// Capture the result of a future into a `Result` future.
///
/// The resulting future will never have an error.
/// Errors have been converted to an [ErrorResult] value.
- static Future<Result> capture(Future future) {
- return future.then(_captureValue, onError: _captureError);
+ static Future<Result/*<T>*/> capture/*<T>*/(Future/*<T>*/ future) {
+ return future.then((value) => new ValueResult(value),
+ onError: (error, stackTrace) =>
+ new ErrorResult/*<T>*/(error, stackTrace));
}
/// Release the result of a captured future.
@@ -95,30 +89,23 @@
///
/// If [future] completes with an error, the returned future completes with
/// the same error.
- static Future release(Future<Result> future) {
- return future.then(_release);
- }
+ static Future/*<T>*/ release/*<T>*/(Future<Result/*<T>*/> future) =>
+ future.then/*<Future<T>>*/((result) => result.asFuture);
/// Capture the results of a stream into a stream of [Result] values.
///
/// The returned stream will not have any error events.
/// Errors from the source stream have been converted to [ErrorResult]s.
- ///
- /// Shorthand for transforming the stream using [captureStreamTransformer].
- static Stream<Result> captureStream(Stream source) {
- return source.transform(captureStreamTransformer);
- }
+ static Stream<Result/*<T>*/> captureStream/*<T>*/(Stream/*<T>*/ source) =>
+ source.transform(new CaptureStreamTransformer/*<T>*/());
/// Release a stream of [result] values into a stream of the results.
///
/// `Result` values of the source stream become value or error events in
/// the returned stream as appropriate.
/// Errors from the source stream become errors in the returned stream.
- ///
- /// Shorthand for transforming the stream using [releaseStreamTransformer].
- static Stream releaseStream(Stream<Result> source) {
- return source.transform(releaseStreamTransformer);
- }
+ static Stream/*<T>*/ releaseStream/*<T>*/(Stream<Result/*<T>*/> source) =>
+ source.transform(new ReleaseStreamTransformer/*<T>*/());
/// Converts a result of a result to a single result.
///
@@ -126,9 +113,10 @@
/// which is then an error, then a result with that error is returned.
/// Otherwise both levels of results are value results, and a single
/// result with the value is returned.
- static Result flatten(Result<Result> result) {
- if (result.isError) return result;
- return result.asValue.value;
+ static Result/*<T>*/ flatten/*<T>*/(Result<Result/*<T>*/> result) {
+ if (result.isValue) return result.asValue.value;
+ return new ErrorResult/*<T>*/(
+ result.asError.error, result.asError.stackTrace);
}
/// Whether this result is a value result.
@@ -149,7 +137,7 @@
/// If this is an error result, return itself.
///
/// Otherwise return `null`.
- ErrorResult get asError;
+ ErrorResult<T> get asError;
/// Complete a completer with this result.
void complete(Completer<T> completer);
diff --git a/lib/src/result/capture_transformer.dart b/lib/src/result/capture_transformer.dart
index ca4d0d3..6b1bbf2 100644
--- a/lib/src/result/capture_transformer.dart
+++ b/lib/src/result/capture_transformer.dart
@@ -7,7 +7,10 @@
import '../result.dart';
import 'capture_sink.dart';
-/// Use [Result.captureTransformer] instead.
+/// A stream transformer that captures a stream of events into [Result]s.
+///
+/// The result of the transformation is a stream of [Result] values and no
+/// error events. This is the transformer used by [captureStream].
@Deprecated("Will be removed in async 2.0.0.")
class CaptureStreamTransformer<T> implements StreamTransformer<T, Result<T>> {
const CaptureStreamTransformer();
diff --git a/lib/src/result/error.dart b/lib/src/result/error.dart
index eecf68e..b6d5859 100644
--- a/lib/src/result/error.dart
+++ b/lib/src/result/error.dart
@@ -8,14 +8,14 @@
import 'value.dart';
/// A result representing a thrown error.
-class ErrorResult implements Result {
+class ErrorResult<T> implements Result<T> {
final error;
final StackTrace stackTrace;
bool get isValue => false;
bool get isError => true;
- ValueResult get asValue => null;
- ErrorResult get asError => this;
+ ValueResult<T> get asValue => null;
+ ErrorResult<T> get asError => this;
ErrorResult(this.error, this.stackTrace);
@@ -27,7 +27,7 @@
sink.addError(error, stackTrace);
}
- Future get asFuture => new Future.error(error, stackTrace);
+ Future<T> get asFuture => new Future.error(error, stackTrace);
/// Calls an error handler with the error and stacktrace.
///
diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart
index db9dd82..209e8b1 100644
--- a/lib/src/result/future.dart
+++ b/lib/src/result/future.dart
@@ -20,11 +20,12 @@
Result<T> _result;
factory ResultFuture(Future<T> future) {
- var resultFuture;
- resultFuture = new ResultFuture._(Result.capture(future).then((result) {
+ ResultFuture<T> resultFuture;
+ resultFuture = new ResultFuture._(() async {
+ var result = await Result.capture(future);
resultFuture._result = result;
- return result.asFuture;
- }));
+ return await result.asFuture;
+ }());
return resultFuture;
}
diff --git a/lib/src/result/value.dart b/lib/src/result/value.dart
index f065ab6..39fa048 100644
--- a/lib/src/result/value.dart
+++ b/lib/src/result/value.dart
@@ -14,7 +14,7 @@
bool get isValue => true;
bool get isError => false;
ValueResult<T> get asValue => this;
- ErrorResult get asError => null;
+ ErrorResult<T> get asError => null;
ValueResult(this.value);
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index 4255175..e01efac 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -9,15 +9,26 @@
///
/// This buffers the broadcast stream's events, which means that it starts
/// listening to a stream as soon as it's bound.
+///
+/// This also casts the source stream's events to type `T`. If the cast fails,
+/// the result stream will emit a [CastError]. This behavior is deprecated, and
+/// should not be relied upon.
class SingleSubscriptionTransformer<S, T> implements StreamTransformer<S, T> {
const SingleSubscriptionTransformer();
Stream<T> bind(Stream<S> stream) {
var subscription;
- var controller = new StreamController(sync: true,
+ var controller = new StreamController<T>(sync: true,
onCancel: () => subscription.cancel());
- subscription = stream.listen(controller.add,
- onError: controller.addError, onDone: controller.close);
+ subscription = stream.listen((value) {
+ // TODO(nweiz): When we release a new major version, get rid of the second
+ // type parameter and avoid this conversion.
+ try {
+ controller.add(value as T);
+ } on CastError catch (error, stackTrace) {
+ controller.addError(error, stackTrace);
+ }
+ }, onError: controller.addError, onDone: controller.close);
return controller.stream;
}
}
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index 73557b2..953985e 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -24,7 +24,7 @@
/// the listen is performed directly on the source stream.
class StreamCompleter<T> {
/// The stream doing the actual work, is returned by [stream].
- final _CompleterStream _stream = new _CompleterStream<T>();
+ final _stream = new _CompleterStream<T>();
/// Convert a `Future<Stream>` to a `Stream`.
///
@@ -34,8 +34,8 @@
///
/// If the future completes with an error, the returned stream will
/// instead contain just that error.
- static Stream fromFuture(Future<Stream> streamFuture) {
- var completer = new StreamCompleter();
+ static Stream/*<T>*/ fromFuture/*<T>*/(Future<Stream/*<T>*/> streamFuture) {
+ var completer = new StreamCompleter/*<T>*/();
streamFuture.then(completer.setSourceStream,
onError: completer.setError);
return completer.stream;
@@ -109,13 +109,13 @@
///
/// Created if the user listens on this stream before the source stream
/// is set, or if using [_setEmpty] so there is no source stream.
- StreamController _controller;
+ StreamController<T> _controller;
/// Source stream for the events provided by this stream.
///
/// Set when the completer sets the source stream using [_setSourceStream]
/// or [_setEmpty].
- Stream _sourceStream;
+ Stream<T> _sourceStream;
StreamSubscription<T> listen(onData(T data),
{Function onError,
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 6239fc2..889ccd8 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -12,8 +12,8 @@
/// this means that events emitted by broadcast streams will be dropped until
/// [stream] has a listener.**
///
-/// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will be
-/// single-subscription. In this case, if [stream] is paused or canceled, all
+/// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will
+/// be single-subscription. In this case, if [stream] is paused or canceled, all
/// streams in the group will likewise be paused or canceled, respectively.
///
/// If the `StreamGroup` is constructed using [new StreamGroup.broadcast],
@@ -53,8 +53,8 @@
///
/// This is equivalent to adding [streams] to a group, closing that group, and
/// returning its stream.
- static Stream merge(Iterable<Stream> streams) {
- var group = new StreamGroup();
+ static Stream/*<T>*/ merge/*<T>*/(Iterable<Stream/*<T>*/> streams) {
+ var group = new StreamGroup/*<T>*/();
streams.forEach(group.add);
group.close();
return group.stream;
@@ -192,7 +192,7 @@
/// Starts actively forwarding events from [stream] to [_controller].
///
/// This will pause the resulting subscription if [this] is paused.
- StreamSubscription _listenToStream(Stream stream) {
+ StreamSubscription<T> _listenToStream(Stream<T> stream) {
var subscription = stream.listen(
_controller.add,
onError: _controller.addError,
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 8482e0c..4ce9e13 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -94,7 +94,7 @@
final Queue<_EventRequest> _requestQueue = new Queue();
/// Create a `StreamQueue` of the events of [source].
- factory StreamQueue(Stream source) = _StreamQueue<T>;
+ factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
StreamQueue._();
@@ -274,7 +274,7 @@
/// Can only be used by the very last request (the stream queue must
/// be closed by that request).
/// Only used by [rest].
- Stream _extractStream();
+ Stream<T> _extractStream();
/// Requests that the event source pauses events.
///
@@ -342,13 +342,13 @@
/// to when a request needs events.
class _StreamQueue<T> extends StreamQueue<T> {
/// Source of events.
- final Stream _sourceStream;
+ final Stream<T> _sourceStream;
/// Subscription on [_sourceStream] while listening for events.
///
/// Set to subscription when listening, and set to `null` when the
/// subscription is done (and [_isDone] is set to true).
- StreamSubscription _subscription;
+ StreamSubscription<T> _subscription;
_StreamQueue(this._sourceStream) : super._();
@@ -422,7 +422,7 @@
///
/// The [close] method is also called immediately when the source stream
/// is done.
-abstract class _EventRequest {
+abstract class _EventRequest<T> {
/// Handle available events.
///
/// The available events are provided as a queue. The `update` function
@@ -443,22 +443,22 @@
/// If the function returns `false` when the stream has already closed
/// ([isDone] is true), then the request must call
/// [StreamQueue._updateRequests] itself when it's ready to continue.
- bool update(Queue<Result> events, bool isDone);
+ bool update(Queue<Result<T>> events, bool isDone);
}
/// Request for a [StreamQueue.next] call.
///
/// Completes the returned future when receiving the first event,
/// and is then complete.
-class _NextRequest<T> implements _EventRequest {
+class _NextRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by [StreamQueue.next].
- final Completer _completer;
+ final _completer = new Completer<T>();
- _NextRequest() : _completer = new Completer<T>();
+ _NextRequest();
Future<T> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.removeFirst().complete(_completer);
return true;
@@ -474,9 +474,9 @@
}
/// Request for a [StreamQueue.skip] call.
-class _SkipRequest implements _EventRequest {
+class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
- final Completer _completer = new Completer<int>();
+ final _completer = new Completer<int>();
/// Number of remaining events to skip.
///
@@ -489,9 +489,9 @@
_SkipRequest(this._eventsToSkip);
/// The future completed when the correct number of events have been skipped.
- Future get future => _completer.future;
+ Future<int> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_eventsToSkip > 0) {
if (events.isEmpty) {
if (isDone) break;
@@ -501,7 +501,7 @@
var event = events.removeFirst();
if (event.isError) {
- event.complete(_completer);
+ _completer.completeError(event.asError.error, event.asError.stackTrace);
return true;
}
}
@@ -511,12 +511,12 @@
}
/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest {
+class _TakeRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
- final Completer _completer;
+ final _completer = new Completer<List<T>>();
/// List collecting events until enough have been seen.
- final List _list = <T>[];
+ final _list = <T>[];
/// Number of events to capture.
///
@@ -524,24 +524,24 @@
/// this value.
final int _eventsToTake;
- _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
+ _TakeRequest(this._eventsToTake);
/// The future completed when the correct number of events have been captured.
- Future get future => _completer.future;
+ Future<List<T>> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.isEmpty) {
if (isDone) break;
return false;
}
- var result = events.removeFirst();
- if (result.isError) {
- result.complete(_completer);
+ var event = events.removeFirst();
+ if (event.isError) {
+ _completer.completeError(event.asError.error, event.asError.stackTrace);
return true;
}
- _list.add(result.asValue.value);
+ _list.add(event.asValue.value);
}
_completer.complete(_list);
return true;
@@ -553,9 +553,9 @@
/// The request needs no events, it just waits in the request queue
/// until all previous events are fulfilled, then it cancels the stream queue
/// source subscription.
-class _CancelRequest implements _EventRequest {
+class _CancelRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the `cancel` call.
- final Completer _completer = new Completer();
+ final _completer = new Completer();
/// The [StreamQueue] object that has this request queued.
///
@@ -568,7 +568,7 @@
/// The future completed when the cancel request is completed.
Future get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (_streamQueue._isDone) {
_completer.complete();
} else {
@@ -584,22 +584,22 @@
/// The request is always complete, it just waits in the request queue
/// until all previous events are fulfilled, then it takes over the
/// stream events subscription and creates a stream from it.
-class _RestRequest<T> implements _EventRequest {
+class _RestRequest<T> implements _EventRequest<T> {
/// Completer for the stream returned by the `rest` call.
- final StreamCompleter _completer = new StreamCompleter<T>();
+ final _completer = new StreamCompleter<T>();
/// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
- final StreamQueue _streamQueue;
+ final StreamQueue<T> _streamQueue;
_RestRequest(this._streamQueue);
/// The stream which will contain the remaining events of [_streamQueue].
Stream<T> get stream => _completer.stream;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
@@ -627,12 +627,12 @@
/// but doesn't consume the event.
/// If the request is closed without seeing an event, then
/// the [future] is completed with `false`.
-class _HasNextRequest<T> implements _EventRequest {
- final Completer _completer = new Completer<bool>();
+class _HasNextRequest<T> implements _EventRequest<T> {
+ final _completer = new Completer<bool>();
Future<bool> get future => _completer.future;
- bool update(Queue<Result> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index b53a8d2..00a7086 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -36,8 +36,9 @@
///
/// If the future completes with an error, the returned sink will instead
/// be closed. Its [Sink.done] future will contain the error.
- static StreamSink fromFuture(Future<StreamSink> sinkFuture) {
- var completer = new StreamSinkCompleter();
+ static StreamSink/*<T>*/ fromFuture/*<T>*/(
+ Future<StreamSink/*<T>*/> sinkFuture) {
+ var completer = new StreamSinkCompleter/*<T>*/();
sinkFuture.then(completer.setDestinationSink,
onError: completer.setError);
return completer.sink;
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
index a20a3fa..8cc3d01 100644
--- a/lib/src/stream_sink_transformer/handler_transformer.dart
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -54,11 +54,7 @@
void add(S event) {
if (_transformer._handleData == null) {
- // [event] is an S and [_inner.add] takes a T. This style of conversion
- // will throw an error in checked mode if [_inner] is actually a
- // [StreamSink<T>], but will work if [_inner] isn't reified and won't add
- // an extra check in unchecked mode.
- _inner.add(event as dynamic);
+ _inner.add(event as T);
} else {
_transformer._handleData(event, _safeCloseInner);
}
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index ec648aa..448fe2a 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -57,10 +57,11 @@
///
/// [count] defaults to 2. This is the same as creating [count] branches and
/// then closing the [StreamSplitter].
- static List<Stream> splitFrom(Stream stream, [int count]) {
+ static List<Stream/*<T>*/> splitFrom/*<T>*/(Stream/*<T>*/ stream,
+ [int count]) {
if (count == null) count = 2;
- var splitter = new StreamSplitter(stream);
- var streams = new List.generate(count, (_) => splitter.split());
+ var splitter = new StreamSplitter/*<T>*/(stream);
+ var streams = new List<Stream>.generate(count, (_) => splitter.split());
splitter.close();
return streams;
}
@@ -75,12 +76,11 @@
throw new StateError("Can't call split() on a closed StreamSplitter.");
}
- var controller;
- controller = new StreamController<T>(
+ var controller = new StreamController<T>(
onListen: _onListen,
onPause: _onPause,
- onResume: _onResume,
- onCancel: () => _onCancel(controller));
+ onResume: _onResume);
+ controller.onCancel = () => _onCancel(controller);
for (var result in _buffer) {
result.addTo(controller);
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index 432cc22..f9e2082 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -17,22 +17,22 @@
StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
- StreamSubscription<List<T>> listen(void onData(List data), {
+ StreamSubscription<List<T>> listen(void onData(List<T> data), {
Function onError,
void onDone(),
bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
- List<StreamSubscription> subscriptions = <StreamSubscription>[];
- StreamController controller;
- List current;
+ var subscriptions = <StreamSubscription<T>>[];
+ StreamController<List<T>> controller;
+ List<T> current;
int dataCount = 0;
/// Called for each data from a subscription in [subscriptions].
- void handleData(int index, data) {
+ void handleData(int index, T data) {
current[index] = data;
dataCount++;
if (dataCount == subscriptions.length) {
- List data = current;
+ var data = current;
current = new List(subscriptions.length);
dataCount = 0;
for (int i = 0; i < subscriptions.length; i++) {
@@ -70,7 +70,7 @@
}
try {
- for (Stream stream in _streams) {
+ for (var stream in _streams) {
int index = subscriptions.length;
subscriptions.add(stream.listen(
(data) { handleData(index, data); },
@@ -87,7 +87,7 @@
current = new List(subscriptions.length);
- controller = new StreamController<List>(
+ controller = new StreamController<List<T>>(
onPause: () {
for (int i = 0; i < subscriptions.length; i++) {
// This may pause some subscriptions more than once.
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 4f58c76..c448620 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -18,7 +18,7 @@
/// If other code is accessing the subscription, results may be unpredictable.
class SubscriptionStream<T> extends Stream<T> {
/// The subscription providing the events for this stream.
- StreamSubscription _source;
+ StreamSubscription<T> _source;
/// Create a single-subscription `Stream` from [subscription].
///
@@ -29,7 +29,7 @@
/// If the `subscription` doesn't send any `done` events, neither will this
/// stream. That may be an issue if `subscription` was made to cancel on
/// an error.
- SubscriptionStream(StreamSubscription subscription)
+ SubscriptionStream(StreamSubscription<T> subscription)
: _source = subscription {
_source.pause();
// Clear callbacks to avoid keeping them alive unnecessarily.
@@ -48,13 +48,10 @@
cancelOnError = (true == cancelOnError);
var subscription = _source;
_source = null;
- var result;
- if (cancelOnError) {
- result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
- } else {
- // Wrap the subscription to ensure correct type parameter.
- result = new DelegatingStreamSubscription<T>(subscription);
- }
+
+ var result = cancelOnError
+ ? new _CancelOnErrorSubscriptionWrapper<T>(subscription)
+ : subscription;
result.onData(onData);
result.onError(onError);
result.onDone(onDone);
diff --git a/pubspec.yaml b/pubspec.yaml
index 2430d75..9e64244 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 1.9.0
+version: 1.9.1-dev
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
@@ -8,4 +8,4 @@
stack_trace: "^1.0.0"
test: "^0.12.0"
environment:
- sdk: ">=1.9.0 <2.0.0"
+ sdk: ">=1.12.0 <2.0.0"