blob: c0fae2f4cf32a727fd564d5de48e9a331f23046a [file] [log] [blame]
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'from_handlers.dart';
extension ConcurrentAsyncMap<T> on Stream<T> {
/// Like [asyncMap] but the [convert] callback may be called for an element
/// before processing for the previous element is finished.
///
/// Events on the result stream will be emitted in the order that [convert]
/// completed which may not match the order of the original stream.
///
/// If the source stream is a broadcast stream the result will be as well.
/// When used with a broadcast stream behavior also differs from [asyncMap] in
/// that the [convert] function is only called once per event, rather than
/// once per listener per event. The [convert] callback won't be called for
/// events while a broadcast stream has no listener.
///
/// Errors from [convert] or the source stream are forwarded directly to the
/// result stream.
///
/// The result stream will not close until the source stream closes and all
/// pending conversions have finished.
Stream<S> concurrentAsyncMap<S>(FutureOr<S> convert(T event)) {
var valuesWaiting = 0;
var sourceDone = false;
return transform(fromHandlers(handleData: (element, sink) {
valuesWaiting++;
() async {
try {
sink.add(await convert(element));
} catch (e, st) {
sink.addError(e, st);
}
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
}, handleDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
}));
}
}
/// Like [Stream.asyncMap] but the [convert] callback may be called for an
/// element before processing for the previous element is finished.
///
/// Events on the result stream will be emitted in the order that [convert]
/// completed which may not match the order of the original stream.
///
/// If the source stream is a broadcast stream the result will be as well. When
/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
/// that the [convert] function is only called once per event, rather than once
/// per listener per event. The [convert] callback won't be called for events
/// while a broadcast stream has no listener.
///
/// Errors from the source stream are forwarded directly to the result stream.
/// Errors during the conversion are also forwarded to the result stream.
///
/// The result stream will not close until the source stream closes and all
/// pending conversions have finished.
@Deprecated('Use the extension instead')
StreamTransformer<S, T> concurrentAsyncMap<S, T>(FutureOr<T> convert(S event)) {
var valuesWaiting = 0;
var sourceDone = false;
return fromHandlers(handleData: (element, sink) {
valuesWaiting++;
() async {
try {
sink.add(await convert(element));
} catch (e, st) {
sink.addError(e, st);
}
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
}, handleDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
});
}