blob: b776d35aec492b10560ff3aecf212f28d870f1dd [file] [log] [blame] [edit]
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of "dart:async";
@Since("3.0")
extension FutureIterable<T> on Iterable<Future<T>> {
/// Waits for futures in parallel.
///
/// Waits for all the futures in this iterable.
/// Returns a list of the resulting values,
/// in the same order as the futures which created them,
/// if all futures are successful.
///
/// Similar to [Future.wait], but reports errors using a
/// [ParallelWaitError], which allows the caller to
/// handle errors and dispose successful results if necessary.
///
/// The returned future is completed when all the futures have completed.
/// If any of the futures do not complete, nor does the returned future.
///
/// If any future completes with an error,
/// the returned future completes with a [ParallelWaitError].
/// The [ParallelWaitError.values] is a list of the values for
/// successful futures and `null` for futures with errors.
/// The [ParallelWaitError.errors] is a list of the same length,
/// with `null` values for the successful futures
/// and an [AsyncError] with the error for futures
/// which completed with an error.
Future<List<T>> get wait {
var results = [for (var f in this) _FutureResult<T>(f)];
if (results.isEmpty) return Future<List<T>>.value(<T>[]);
@pragma('vm:awaiter-link')
final c = Completer<List<T>>.sync();
_FutureResult._waitAll(results, (errors) {
if (errors == 0) {
c.complete([for (var r in results) r.value]);
} else {
var errorList = [for (var r in results) r.errorOrNull];
c.completeError(
ParallelWaitError<List<T?>, List<AsyncError?>>(
[for (var r in results) r.valueOrNull],
errorList,
errorCount: errors,
defaultError: errorList.firstWhere(_notNull),
),
);
}
});
return c.future;
}
}
bool _notNull(Object? object) => object != null;
/// Parallel operations on a record of futures.
///
/// {@template record-parallel-wait}
/// Waits for futures in parallel.
///
/// Waits for all the futures in this record.
/// Returns a record of the values, if all futures are successful.
///
/// The returned future is completed when all the futures have completed.
/// If any of the futures do not complete, nor does the returned future.
///
/// If some futures complete with an error,
/// the returned future completes with a [ParallelWaitError].
/// The [ParallelWaitError.values] is a record of the values of
/// successful futures, and `null` for futures with errors.
/// The [ParallelWaitError.errors] is a record of the same shape,
/// with `null` values for the successful futures
/// and an [AsyncError] with the error of futures
/// which completed with an error.
/// {@endtemplate}
@Since("3.0")
extension FutureRecord2<T1, T2> on (Future<T1>, Future<T2>) {
/// {@macro record-parallel-wait}
Future<(T1, T2)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
_FutureResult._waitAll([v1, v2], (int errors) {
if (errors == 0) {
c.complete((v1.value, v2.value));
} else {
c.completeError(
ParallelWaitError(
(v1.valueOrNull, v2.valueOrNull),
(v1.errorOrNull, v2.errorOrNull),
errorCount: errors,
defaultError: v1.errorOrNull ?? v2.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord3<T1, T2, T3> on (Future<T1>, Future<T2>, Future<T3>) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
_FutureResult._waitAll([v1, v2, v3], (int errors) {
if (errors == 0) {
c.complete((v1.value, v2.value, v3.value));
} else {
c.completeError(
ParallelWaitError(
(v1.valueOrNull, v2.valueOrNull, v3.valueOrNull),
(v1.errorOrNull, v2.errorOrNull, v3.errorOrNull),
errorCount: errors,
defaultError: v1.errorOrNull ?? v2.errorOrNull ?? v3.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord4<T1, T2, T3, T4>
on (Future<T1>, Future<T2>, Future<T3>, Future<T4>) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3, T4)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3, T4)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
final v4 = _FutureResult<T4>($4);
_FutureResult._waitAll([v1, v2, v3, v4], (int errors) {
if (errors == 0) {
c.complete((v1.value, v2.value, v3.value, v4.value));
} else {
c.completeError(
ParallelWaitError(
(v1.valueOrNull, v2.valueOrNull, v3.valueOrNull, v4.valueOrNull),
(v1.errorOrNull, v2.errorOrNull, v3.errorOrNull, v4.errorOrNull),
errorCount: errors,
defaultError:
v1.errorOrNull ??
v2.errorOrNull ??
v3.errorOrNull ??
v4.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord5<T1, T2, T3, T4, T5>
on (Future<T1>, Future<T2>, Future<T3>, Future<T4>, Future<T5>) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3, T4, T5)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3, T4, T5)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
final v4 = _FutureResult<T4>($4);
final v5 = _FutureResult<T5>($5);
_FutureResult._waitAll([v1, v2, v3, v4, v5], (int errors) {
if (errors == 0) {
c.complete((v1.value, v2.value, v3.value, v4.value, v5.value));
} else {
c.completeError(
ParallelWaitError(
(
v1.valueOrNull,
v2.valueOrNull,
v3.valueOrNull,
v4.valueOrNull,
v5.valueOrNull,
),
(
v1.errorOrNull,
v2.errorOrNull,
v3.errorOrNull,
v4.errorOrNull,
v5.errorOrNull,
),
errorCount: errors,
defaultError:
v1.errorOrNull ??
v2.errorOrNull ??
v3.errorOrNull ??
v4.errorOrNull ??
v5.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord6<T1, T2, T3, T4, T5, T6>
on
(
Future<T1>,
Future<T2>,
Future<T3>,
Future<T4>,
Future<T5>,
Future<T6>,
) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3, T4, T5, T6)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3, T4, T5, T6)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
final v4 = _FutureResult<T4>($4);
final v5 = _FutureResult<T5>($5);
final v6 = _FutureResult<T6>($6);
_FutureResult._waitAll([v1, v2, v3, v4, v5, v6], (int errors) {
if (errors == 0) {
c.complete((
v1.value,
v2.value,
v3.value,
v4.value,
v5.value,
v6.value,
));
} else {
c.completeError(
ParallelWaitError(
(
v1.valueOrNull,
v2.valueOrNull,
v3.valueOrNull,
v4.valueOrNull,
v5.valueOrNull,
v6.valueOrNull,
),
(
v1.errorOrNull,
v2.errorOrNull,
v3.errorOrNull,
v4.errorOrNull,
v5.errorOrNull,
v6.errorOrNull,
),
errorCount: errors,
defaultError:
v1.errorOrNull ??
v2.errorOrNull ??
v3.errorOrNull ??
v4.errorOrNull ??
v5.errorOrNull ??
v6.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord7<T1, T2, T3, T4, T5, T6, T7>
on
(
Future<T1>,
Future<T2>,
Future<T3>,
Future<T4>,
Future<T5>,
Future<T6>,
Future<T7>,
) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3, T4, T5, T6, T7)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3, T4, T5, T6, T7)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
final v4 = _FutureResult<T4>($4);
final v5 = _FutureResult<T5>($5);
final v6 = _FutureResult<T6>($6);
final v7 = _FutureResult<T7>($7);
_FutureResult._waitAll([v1, v2, v3, v4, v5, v6, v7], (int errors) {
if (errors == 0) {
c.complete((
v1.value,
v2.value,
v3.value,
v4.value,
v5.value,
v6.value,
v7.value,
));
} else {
c.completeError(
ParallelWaitError(
(
v1.valueOrNull,
v2.valueOrNull,
v3.valueOrNull,
v4.valueOrNull,
v5.valueOrNull,
v6.valueOrNull,
v7.valueOrNull,
),
(
v1.errorOrNull,
v2.errorOrNull,
v3.errorOrNull,
v4.errorOrNull,
v5.errorOrNull,
v6.errorOrNull,
v7.errorOrNull,
),
errorCount: errors,
defaultError:
v1.errorOrNull ??
v2.errorOrNull ??
v3.errorOrNull ??
v4.errorOrNull ??
v5.errorOrNull ??
v6.errorOrNull ??
v7.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord8<T1, T2, T3, T4, T5, T6, T7, T8>
on
(
Future<T1>,
Future<T2>,
Future<T3>,
Future<T4>,
Future<T5>,
Future<T6>,
Future<T7>,
Future<T8>,
) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3, T4, T5, T6, T7, T8)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3, T4, T5, T6, T7, T8)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
final v4 = _FutureResult<T4>($4);
final v5 = _FutureResult<T5>($5);
final v6 = _FutureResult<T6>($6);
final v7 = _FutureResult<T7>($7);
final v8 = _FutureResult<T8>($8);
_FutureResult._waitAll([v1, v2, v3, v4, v5, v6, v7, v8], (int errors) {
if (errors == 0) {
c.complete((
v1.value,
v2.value,
v3.value,
v4.value,
v5.value,
v6.value,
v7.value,
v8.value,
));
} else {
c.completeError(
ParallelWaitError(
(
v1.valueOrNull,
v2.valueOrNull,
v3.valueOrNull,
v4.valueOrNull,
v5.valueOrNull,
v6.valueOrNull,
v7.valueOrNull,
v8.valueOrNull,
),
(
v1.errorOrNull,
v2.errorOrNull,
v3.errorOrNull,
v4.errorOrNull,
v5.errorOrNull,
v6.errorOrNull,
v7.errorOrNull,
v8.errorOrNull,
),
errorCount: errors,
defaultError:
v1.errorOrNull ??
v2.errorOrNull ??
v3.errorOrNull ??
v4.errorOrNull ??
v5.errorOrNull ??
v6.errorOrNull ??
v7.errorOrNull ??
v8.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Parallel operations on a record of futures.
@Since("3.0")
extension FutureRecord9<T1, T2, T3, T4, T5, T6, T7, T8, T9>
on
(
Future<T1>,
Future<T2>,
Future<T3>,
Future<T4>,
Future<T5>,
Future<T6>,
Future<T7>,
Future<T8>,
Future<T9>,
) {
/// {@macro record-parallel-wait}
Future<(T1, T2, T3, T4, T5, T6, T7, T8, T9)> get wait {
@pragma('vm:awaiter-link')
final c = Completer<(T1, T2, T3, T4, T5, T6, T7, T8, T9)>.sync();
final v1 = _FutureResult<T1>($1);
final v2 = _FutureResult<T2>($2);
final v3 = _FutureResult<T3>($3);
final v4 = _FutureResult<T4>($4);
final v5 = _FutureResult<T5>($5);
final v6 = _FutureResult<T6>($6);
final v7 = _FutureResult<T7>($7);
final v8 = _FutureResult<T8>($8);
final v9 = _FutureResult<T9>($9);
_FutureResult._waitAll([v1, v2, v3, v4, v5, v6, v7, v8, v9], (int errors) {
if (errors == 0) {
c.complete((
v1.value,
v2.value,
v3.value,
v4.value,
v5.value,
v6.value,
v7.value,
v8.value,
v9.value,
));
} else {
c.completeError(
ParallelWaitError(
(
v1.valueOrNull,
v2.valueOrNull,
v3.valueOrNull,
v4.valueOrNull,
v5.valueOrNull,
v6.valueOrNull,
v7.valueOrNull,
v8.valueOrNull,
v9.valueOrNull,
),
(
v1.errorOrNull,
v2.errorOrNull,
v3.errorOrNull,
v4.errorOrNull,
v5.errorOrNull,
v6.errorOrNull,
v7.errorOrNull,
v8.errorOrNull,
v9.errorOrNull,
),
errorCount: errors,
defaultError:
v1.errorOrNull ??
v2.errorOrNull ??
v3.errorOrNull ??
v4.errorOrNull ??
v5.errorOrNull ??
v6.errorOrNull ??
v7.errorOrNull ??
v8.errorOrNull ??
v9.errorOrNull,
),
);
}
});
return c.future;
}
}
/// Error thrown when waiting for multiple futures, when some have errors.
///
/// The [V] and [E] types will have the same basic shape as the
/// original collection of futures that was waited on.
///
/// For example, if the original awaited futures were a record
/// `(Future<T1>, ..., Future<Tn>)`,
/// the type `V` will be `(T1?, ..., Tn?)` which allows keeping the
/// values of futures that completed with a value,
/// and `E` will be `(AsyncError?, ..., AsyncError?)`, also with *n*
/// fields, which can contain the errors for the futures which completed
/// with an error.
///
/// Waiting for a list or iterable of futures should provide
/// a list of nullable values and errors of the same length.
@Since("3.0")
class ParallelWaitError<V, E> extends Error {
/// Values of successful futures.
///
/// Has the same shape as the original collection of futures,
/// with values for each successful future and `null` values
/// for each failing future.
final V values;
/// Errors of failing futures.
///
/// Has the same shape as the original collection of futures,
/// with errors, typically [AsyncError], for each failing
/// future and `null` values for each successful future.
final E errors;
/// An error which, if present, is included in the [toString] output.
///
/// If the default error has a stack trace, it's also reported by the
/// [stackTrace] getter, instead of where this [ParallelWaitError] was thrown.
final AsyncError? _defaultError;
/// Number of errors, if available.
final int? _errorCount;
/// Creates error with the provided [values] and [errors].
///
/// If [defaultError] is provided, its [AsyncError.error] is used in
/// the [toString] of this parallel error, and its [AsyncError.stackTrace]
/// is returned by [stackTrace].
///
/// If [errorCount] is provided, and it's greater than one,
/// the number is reported in the [toString].
ParallelWaitError(
this.values,
this.errors, {
@Since("3.4") int? errorCount,
@Since("3.4") AsyncError? defaultError,
}) : _defaultError = defaultError,
_errorCount = errorCount;
String toString() {
if (_defaultError == null) {
if (_errorCount == null || _errorCount <= 1) {
return "ParallelWaitError";
}
return "ParallelWaitError($_errorCount errors)";
}
return "ParallelWaitError${_errorCount != null && _errorCount > 1 //
? "($_errorCount errors)" : ""}: ${_defaultError.error}";
}
StackTrace? get stackTrace => _defaultError?.stackTrace ?? super.stackTrace;
}
/// The result of a future, when it has completed.
///
/// Stores a value result in [value] and an error result in [error].
/// Then calls [onReady] with a 0 argument for a value, and 1 for an error.
///
/// The [onReady] callback must be set synchronously,
/// before the future has a chance to complete.
///
/// Abstracted into a class of its own in order to reuse code.
class _FutureResult<T> {
// Consider integrating directly into `_Future` as a `_FutureListener`
// to avoid creating as many function tear-offs.
final Future<T> source;
/// The value or `null`.
///
/// Set when the future completes with a value.
T? valueOrNull;
/// Set when the future completes with an error or value.
AsyncError? errorOrNull;
_FutureResult(this.source);
/// The value.
///
/// Should only be used when the future is known to have completed with
/// a value.
T get value => valueOrNull ?? valueOrNull as T;
void _wait(@pragma('vm:awaiter-link') void Function(int) whenReady) {
source.then(
(T value) {
valueOrNull = value;
whenReady(0);
},
onError: (Object error, StackTrace stack) {
errorOrNull = AsyncError(error, stack);
whenReady(1);
},
);
}
/// Waits for a number of [_FutureResult]s to all have completed.
///
/// List must not be empty.
static void _waitAll(
List<_FutureResult> results,
@pragma('vm:awaiter-link') void Function(int) whenReady,
) {
assert(results.isNotEmpty);
var ready = 0;
var errors = 0;
void onReady(int error) {
errors += error;
if (++ready == results.length) {
whenReady(errors);
}
}
for (var r in results) {
r._wait(onReady);
}
}
static void _noop(_) {}
}