Merge pull request #1266 from dart-lang/merge-stream_channel-package
Merge `package:stream_channel`
diff --git a/.github/ISSUE_TEMPLATE/stream_channel.md b/.github/ISSUE_TEMPLATE/stream_channel.md
new file mode 100644
index 0000000..76b5994
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/stream_channel.md
@@ -0,0 +1,5 @@
+---
+name: "package:stream_channel"
+about: "Create a bug or file a feature request against package:stream_channel."
+labels: "package:stream_channel"
+---
\ No newline at end of file
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 25efb2a..64585f3 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -116,6 +116,10 @@
- changed-files:
- any-glob-to-any-file: 'pkgs/stack_trace/**'
+'package:stream_channel':
+ - changed-files:
+ - any-glob-to-any-file: 'pkgs/stream_channel/**'
+
'package:stream_transform':
- changed-files:
- any-glob-to-any-file: 'pkgs/stream_transform/**'
diff --git a/.github/workflows/stream_channel.yaml b/.github/workflows/stream_channel.yaml
new file mode 100644
index 0000000..c39424d
--- /dev/null
+++ b/.github/workflows/stream_channel.yaml
@@ -0,0 +1,74 @@
+name: package:stream_channel
+
+on:
+ # Run on PRs and pushes to the default branch.
+ push:
+ branches: [ main ]
+ paths:
+ - '.github/workflows/stream_channel.yaml'
+ - 'pkgs/stream_channel/**'
+ pull_request:
+ branches: [ main ]
+ paths:
+ - '.github/workflows/stream_channel.yaml'
+ - 'pkgs/stream_channel/**'
+ schedule:
+ - cron: "0 0 * * 0"
+
+env:
+ PUB_ENVIRONMENT: bot.github
+
+defaults:
+ run:
+ working-directory: pkgs/stream_channel/
+
+jobs:
+ # Check code formatting and static analysis on a single OS (linux)
+ # against Dart dev.
+ analyze:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ sdk: [dev]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+ - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+ with:
+ sdk: ${{ matrix.sdk }}
+ - id: install
+ name: Install dependencies
+ run: dart pub get
+ - name: Check formatting
+ run: dart format --output=none --set-exit-if-changed .
+ if: always() && steps.install.outcome == 'success'
+ - name: Analyze code
+ run: dart analyze --fatal-infos
+ if: always() && steps.install.outcome == 'success'
+
+ # Run tests on a matrix consisting of two dimensions:
+ # 1. OS: ubuntu-latest, (macos-latest, windows-latest)
+ # 2. release channel: dev
+ test:
+ needs: analyze
+ runs-on: ${{ matrix.os }}
+ strategy:
+ fail-fast: false
+ matrix:
+ # Add macos-latest and/or windows-latest if relevant for this package.
+ os: [ubuntu-latest]
+ sdk: [3.3, dev]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+ - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+ with:
+ sdk: ${{ matrix.sdk }}
+ - id: install
+ name: Install dependencies
+ run: dart pub get
+ - name: Run VM tests
+ run: dart test --platform vm
+ if: always() && steps.install.outcome == 'success'
+ - name: Run Chrome tests
+ run: dart test --platform chrome
+ if: always() && steps.install.outcome == 'success'
diff --git a/README.md b/README.md
index 563f90d..e58417b 100644
--- a/README.md
+++ b/README.md
@@ -42,6 +42,7 @@
| [source_span](pkgs/source_span/) | Provides a standard representation for source code locations and spans. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_span) | [](https://pub.dev/packages/source_span) |
| [sse](pkgs/sse/) | Provides client and server functionality for setting up bi-directional communication through Server Sent Events (SSE) and corresponding POST requests. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asse) | [](https://pub.dev/packages/sse) |
| [stack_trace](pkgs/stack_trace/) | A package for manipulating stack traces and printing them readably. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astack_trace) | [](https://pub.dev/packages/stack_trace) |
+| [stream_channel](pkgs/stream_channel/) | An abstraction for two-way communication channels based on the Dart Stream class. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_channel) | [](https://pub.dev/packages/stream_channel) |
| [stream_transform](pkgs/stream_transform/) | A collection of utilities to transform and manipulate streams. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_transform) | [](https://pub.dev/packages/stream_transform) |
| [term_glyph](pkgs/term_glyph/) | Useful Unicode glyphs and ASCII substitutes. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aterm_glyph) | [](https://pub.dev/packages/term_glyph) |
| [test_reflective_loader](pkgs/test_reflective_loader/) | Support for discovering tests and test suites using reflection. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Atest_reflective_loader) | [](https://pub.dev/packages/test_reflective_loader) |
diff --git a/pkgs/stream_channel/.gitignore b/pkgs/stream_channel/.gitignore
new file mode 100644
index 0000000..1447012
--- /dev/null
+++ b/pkgs/stream_channel/.gitignore
@@ -0,0 +1,10 @@
+.buildlog
+.dart_tool/
+.DS_Store
+.idea
+.pub/
+.settings/
+build/
+packages
+.packages
+pubspec.lock
diff --git a/pkgs/stream_channel/AUTHORS b/pkgs/stream_channel/AUTHORS
new file mode 100644
index 0000000..e8063a8
--- /dev/null
+++ b/pkgs/stream_channel/AUTHORS
@@ -0,0 +1,6 @@
+# Below is a list of people and organizations that have contributed
+# to the project. Names should be added to the list like so:
+#
+# Name/Organization <email address>
+
+Google Inc.
diff --git a/pkgs/stream_channel/CHANGELOG.md b/pkgs/stream_channel/CHANGELOG.md
new file mode 100644
index 0000000..30f7d32
--- /dev/null
+++ b/pkgs/stream_channel/CHANGELOG.md
@@ -0,0 +1,158 @@
+## 2.1.3
+
+* Require Dart 3.3
+* Move to `dart-lang/tools` monorepo.
+
+## 2.1.2
+
+* Require Dart 2.19
+* Add an example.
+* Fix a race condition in `IsolateChannel.connectReceive()` where the channel
+ could hang forever if its sink was closed before the connection was established.
+
+## 2.1.1
+
+* Require Dart 2.14
+* Populate the pubspec `repository` field.
+* Handle multichannel messages where the ID element is a `double` at runtime
+ instead of an `int`. When reading an array with `dart2wasm` numbers within the
+ array are parsed as `double`.
+
+## 2.1.0
+
+* Stable release for null safety.
+
+## 2.0.0
+
+**Breaking changes**
+
+* `IsolateChannel` requires a separate import
+ `package:stram_channel/isolate_channel.dart`.
+ `package:stream_channel/stream_channel.dart` will now not trigger any platform
+ concerns due to importing `dart:isolate`.
+* Remove `JsonDocumentTransformer` class. The `jsonDocument` top level is still
+ available.
+* Remove `StreamChannelTransformer.typed`. Use `.cast` on the transformed
+ channel instead.
+* Change `Future<dynamic>` returns to `Future<void>`.
+
+## 1.7.0
+
+* Make `IsolateChannel` available through
+ `package:stream_channel/isolate_channel.dart`. This will be the required
+ import in the next release.
+* Require `2.0.0` or newer SDK.
+* Internal style changes.
+
+## 1.6.8
+
+* Set max SDK version to `<3.0.0`, and adjust other dependencies.
+
+## 1.6.7+1
+
+* Fix Dart 2 runtime types in `IsolateChannel`.
+
+## 1.6.7
+
+* Update SDK version to 2.0.0-dev.17.0.
+* Add a type argument to `MultiChannel`.
+
+## 1.6.6
+
+* Fix a Dart 2 issue with inner stream transformation in `GuaranteeChannel`.
+
+* Fix a Dart 2 issue with `StreamChannelTransformer.fromCodec()`.
+
+## 1.6.5
+
+* Fix an issue with `JsonDocumentTransformer.bind` where it created an internal
+ stream channel which didn't get a properly inferred type for its `sink`.
+
+## 1.6.4
+
+* Fix a race condition in `MultiChannel` where messages from a remote virtual
+ channel could get dropped if the corresponding local channel wasn't registered
+ quickly enough.
+
+## 1.6.3
+
+* Use `pumpEventQueue()` from test.
+
+## 1.6.2
+
+* Declare support for `async` 2.0.0.
+
+## 1.6.1
+
+* Fix the type of `StreamChannel.transform()`. This previously inverted the
+ generic parameters, so it only really worked with transformers where both
+ generic types were identical.
+
+## 1.6.0
+
+* `Disconnector.disconnect()` now returns a future that completes when all the
+ inner `StreamSink.close()` futures have completed.
+
+## 1.5.0
+
+* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee
+ that closing the sink causes the stream to close before it emits any more
+ events. This is the only guarantee that isn't automatically preserved when
+ transforming a channel.
+
+* `StreamChannelTransformer`s provided by the `stream_channel` package now
+ properly provide the guarantee that closing the sink causes the stream to
+ close before it emits any more events
+
+## 1.4.0
+
+* Add `StreamChannel.cast()`, which soundly coerces the generic type of a
+ channel.
+
+* Add `StreamChannelTransformer.typed()`, which soundly coerces the generic type
+ of a transformer.
+
+## 1.3.2
+
+* Fix all strong-mode errors and warnings.
+
+## 1.3.1
+
+* Make `IsolateChannel` slightly more efficient.
+
+* Make `MultiChannel` follow the stream channel rules.
+
+## 1.3.0
+
+* Add `Disconnector`, a transformer that allows the caller to disconnect the
+ transformed channel.
+
+## 1.2.0
+
+* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
+ wrapping to ensure that it obeys the stream channel guarantees.
+
+* Add `StreamChannelController`, which can be used to create custom
+ `StreamChannel` objects.
+
+## 1.1.1
+
+* Fix the type annotation for `StreamChannel.transform()`'s parameter.
+
+## 1.1.0
+
+* Add `StreamChannel.transformStream()`, `StreamChannel.transformSink()`,
+ `StreamChannel.changeStream()`, and `StreamChannel.changeSink()` to support
+ changing only the stream or only the sink of a channel.
+
+* Be more explicit about `JsonDocumentTransformer`'s error-handling behavior.
+
+## 1.0.1
+
+* Fix `MultiChannel`'s constructor to take a `StreamChannel`. This is
+ technically a breaking change, but since 1.0.0 was only released an hour ago,
+ we're treating it as a bug fix.
+
+## 1.0.0
+
+* Initial version
diff --git a/pkgs/stream_channel/LICENSE b/pkgs/stream_channel/LICENSE
new file mode 100644
index 0000000..dbd2843
--- /dev/null
+++ b/pkgs/stream_channel/LICENSE
@@ -0,0 +1,27 @@
+Copyright 2015, the Dart project authors.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+ * Neither the name of Google LLC nor the names of its
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pkgs/stream_channel/README.md b/pkgs/stream_channel/README.md
new file mode 100644
index 0000000..3677ccf
--- /dev/null
+++ b/pkgs/stream_channel/README.md
@@ -0,0 +1,20 @@
+[](https://github.com/dart-lang/tools/actions/workflows/stream_channel.yaml)
+[](https://pub.dev/packages/stream_channel)
+[](https://pub.dev/packages/stream_channel/publisher)
+
+This package exposes the `StreamChannel` interface, which represents a two-way
+communication channel. Each `StreamChannel` exposes a `Stream` for receiving
+data and a `StreamSink` for sending it.
+
+`StreamChannel` helps abstract communication logic away from the underlying
+protocol. For example, the [`test`][test] package re-uses its test suite
+communication protocol for both WebSocket connections to browser suites and
+Isolate connections to VM tests.
+
+[test]: https://pub.dev/packages/test
+
+This package also contains utilities for dealing with `StreamChannel`s and with
+two-way communications in general. For documentation of these utilities, see
+[the API docs][api].
+
+[api]: https://pub.dev/documentation/stream_channel/latest/
diff --git a/pkgs/stream_channel/analysis_options.yaml b/pkgs/stream_channel/analysis_options.yaml
new file mode 100644
index 0000000..44cda4d
--- /dev/null
+++ b/pkgs/stream_channel/analysis_options.yaml
@@ -0,0 +1,5 @@
+include: package:dart_flutter_team_lints/analysis_options.yaml
+
+analyzer:
+ language:
+ strict-casts: true
diff --git a/pkgs/stream_channel/example/example.dart b/pkgs/stream_channel/example/example.dart
new file mode 100644
index 0000000..b41d8d9
--- /dev/null
+++ b/pkgs/stream_channel/example/example.dart
@@ -0,0 +1,110 @@
+// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+import 'dart:io';
+import 'dart:isolate';
+
+import 'package:stream_channel/isolate_channel.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+Future<void> main() async {
+ // A StreamChannel<T>, is in simplest terms, a wrapper around a Stream<T> and
+ // a StreamSink<T>. For example, you can create a channel that wraps standard
+ // IO:
+ var stdioChannel = StreamChannel(stdin, stdout);
+ stdioChannel.sink.add('Hello!\n'.codeUnits);
+
+ // Like a Stream<T> can be transformed with a StreamTransformer<T>, a
+ // StreamChannel<T> can be transformed with a StreamChannelTransformer<T>.
+ // For example, we can handle standard input as strings:
+ var stringChannel = stdioChannel
+ .transform(StreamChannelTransformer.fromCodec(utf8))
+ .transformStream(const LineSplitter());
+ stringChannel.sink.add('world!\n');
+
+ // You can implement StreamChannel<T> by extending StreamChannelMixin<T>, but
+ // it's much easier to use a StreamChannelController<T>. A controller has two
+ // StreamChannel<T> members: `local` and `foreign`. The creator of a
+ // controller should work with the `local` channel, while the recipient should
+ // work with the `foreign` channel, and usually will not have direct access to
+ // the underlying controller.
+ var ctrl = StreamChannelController<String>();
+ ctrl.local.stream.listen((event) {
+ // Do something useful here...
+ });
+
+ // You can also pipe events from one channel to another.
+ ctrl
+ ..foreign.pipe(stringChannel)
+ ..local.sink.add('Piped!\n');
+ await ctrl.local.sink.close();
+
+ // The StreamChannel<T> interface provides several guarantees, which can be
+ // found here:
+ // https://pub.dev/documentation/stream_channel/latest/stream_channel/StreamChannel-class.html
+ //
+ // By calling `StreamChannel<T>.withGuarantees()`, you can create a
+ // StreamChannel<T> that provides all guarantees.
+ var dummyCtrl0 = StreamChannelController<String>();
+ var guaranteedChannel = StreamChannel.withGuarantees(
+ dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
+
+ // To close a StreamChannel, use `sink.close()`.
+ await guaranteedChannel.sink.close();
+
+ // A MultiChannel<T> multiplexes multiple virtual channels across a single
+ // underlying transport layer. For example, an application listening over
+ // standard I/O can still support multiple clients if it has a mechanism to
+ // separate events from different clients.
+ //
+ // A MultiChannel<T> splits events into numbered channels, which are
+ // instances of VirtualChannel<T>.
+ var dummyCtrl1 = StreamChannelController<String>();
+ var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
+ var channel1 = multiChannel.virtualChannel();
+ await multiChannel.sink.close();
+
+ // The client/peer should also create its own MultiChannel<T>, connected to
+ // the underlying transport, use the corresponding ID's to handle events in
+ // their respective channels. It is up to you how to communicate channel ID's
+ // across different endpoints.
+ var dummyCtrl2 = StreamChannelController<String>();
+ var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
+ var channel2 = multiChannel2.virtualChannel(channel1.id);
+ await channel2.sink.close();
+ await multiChannel2.sink.close();
+
+ // Multiple instances of a Dart application can communicate easily across
+ // `SendPort`/`ReceivePort` pairs by means of the `IsolateChannel<T>` class.
+ // Typically, one endpoint will create a `ReceivePort`, and call the
+ // `IsolateChannel.connectReceive` constructor. The other endpoint will be
+ // given the corresponding `SendPort`, and then call
+ // `IsolateChannel.connectSend`.
+ var recv = ReceivePort();
+ var recvChannel = IsolateChannel<void>.connectReceive(recv);
+ var sendChannel = IsolateChannel<void>.connectSend(recv.sendPort);
+
+ // You must manually close `IsolateChannel<T>` sinks, however.
+ await recvChannel.sink.close();
+ await sendChannel.sink.close();
+
+ // You can use the `Disconnector` transformer to cause a channel to act as
+ // though the remote end of its transport had disconnected.
+ var disconnector = Disconnector<String>();
+ var disconnectable = stringChannel.transform(disconnector);
+ disconnectable.sink.add('Still connected!');
+ await disconnector.disconnect();
+
+ // Additionally:
+ // * The `DelegatingStreamController<T>` class can be extended to build a
+ // basis for wrapping other `StreamChannel<T>` objects.
+ // * The `jsonDocument` transformer converts events to/from JSON, using
+ // the `json` codec from `dart:convert`.
+ // * `package:json_rpc_2` directly builds on top of
+ // `package:stream_channel`, so any compatible transport can be used to
+ // create interactive client/server or peer-to-peer applications (i.e.
+ // language servers, microservices, etc.
+}
diff --git a/pkgs/stream_channel/lib/isolate_channel.dart b/pkgs/stream_channel/lib/isolate_channel.dart
new file mode 100644
index 0000000..5d9f6e1
--- /dev/null
+++ b/pkgs/stream_channel/lib/isolate_channel.dart
@@ -0,0 +1,5 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/isolate_channel.dart' show IsolateChannel;
diff --git a/pkgs/stream_channel/lib/src/close_guarantee_channel.dart b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart
new file mode 100644
index 0000000..13432d1
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart
@@ -0,0 +1,91 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that specifically enforces the stream channel guarantee
+/// that closing the sink causes the stream to close before it emits any more
+/// events
+///
+/// This is exposed via [StreamChannel.withCloseGuarantee].
+class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> {
+ @override
+ Stream<T> get stream => _stream;
+ late final _CloseGuaranteeStream<T> _stream;
+
+ @override
+ StreamSink<T> get sink => _sink;
+ late final _CloseGuaranteeSink<T> _sink;
+
+ /// The subscription to the inner stream.
+ StreamSubscription<T>? _subscription;
+
+ /// Whether the sink has closed, causing the underlying channel to disconnect.
+ bool _disconnected = false;
+
+ CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
+ _sink = _CloseGuaranteeSink<T>(innerSink, this);
+ _stream = _CloseGuaranteeStream<T>(innerStream, this);
+ }
+}
+
+/// The stream for [CloseGuaranteeChannel].
+///
+/// This wraps the inner stream to save the subscription on the channel when
+/// [listen] is called.
+class _CloseGuaranteeStream<T> extends Stream<T> {
+ /// The inner stream this is delegating to.
+ final Stream<T> _inner;
+
+ /// The [CloseGuaranteeChannel] this belongs to.
+ final CloseGuaranteeChannel<T> _channel;
+
+ _CloseGuaranteeStream(this._inner, this._channel);
+
+ @override
+ StreamSubscription<T> listen(void Function(T)? onData,
+ {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+ // If the channel is already disconnected, we shouldn't dispatch anything
+ // but a done event.
+ if (_channel._disconnected) {
+ onData = null;
+ onError = null;
+ }
+
+ var subscription = _inner.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+ if (!_channel._disconnected) {
+ _channel._subscription = subscription;
+ }
+ return subscription;
+ }
+}
+
+/// The sink for [CloseGuaranteeChannel].
+///
+/// This wraps the inner sink to cancel the stream subscription when the sink is
+/// canceled.
+class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> {
+ /// The [CloseGuaranteeChannel] this belongs to.
+ final CloseGuaranteeChannel<T> _channel;
+
+ _CloseGuaranteeSink(super.inner, this._channel);
+
+ @override
+ Future<void> close() {
+ var done = super.close();
+ _channel._disconnected = true;
+ var subscription = _channel._subscription;
+ if (subscription != null) {
+ // Don't dispatch anything but a done event.
+ subscription.onData(null);
+ subscription.onError(null);
+ }
+ return done;
+ }
+}
diff --git a/pkgs/stream_channel/lib/src/delegating_stream_channel.dart b/pkgs/stream_channel/lib/src/delegating_stream_channel.dart
new file mode 100644
index 0000000..4484a59
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/delegating_stream_channel.dart
@@ -0,0 +1,23 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A simple delegating wrapper around [StreamChannel].
+///
+/// Subclasses can override individual methods, or use this to expose only
+/// [StreamChannel] methods.
+class DelegatingStreamChannel<T> extends StreamChannelMixin<T> {
+ /// The inner channel to which methods are forwarded.
+ final StreamChannel<T> _inner;
+
+ @override
+ Stream<T> get stream => _inner.stream;
+ @override
+ StreamSink<T> get sink => _inner.sink;
+
+ DelegatingStreamChannel(this._inner);
+}
diff --git a/pkgs/stream_channel/lib/src/disconnector.dart b/pkgs/stream_channel/lib/src/disconnector.dart
new file mode 100644
index 0000000..3414e9c
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/disconnector.dart
@@ -0,0 +1,153 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// Allows the caller to force a channel to disconnect.
+///
+/// When [disconnect] is called, the channel (or channels) transformed by this
+/// transformer will act as though the remote end had disconnected—the stream
+/// will emit a done event, and the sink will ignore future inputs. The inner
+/// sink will also be closed to notify the remote end of the disconnection.
+///
+/// If a channel is transformed after the [disconnect] has been called, it will
+/// be disconnected immediately.
+class Disconnector<T> implements StreamChannelTransformer<T, T> {
+ /// Whether [disconnect] has been called.
+ bool get isDisconnected => _disconnectMemo.hasRun;
+
+ /// The sinks for transformed channels.
+ ///
+ /// Note that we assume that transformed channels provide the stream channel
+ /// guarantees. This allows us to only track sinks, because we know closing
+ /// the underlying sink will cause the stream to emit a done event.
+ final _sinks = <_DisconnectorSink<T>>[];
+
+ /// Disconnects all channels that have been transformed.
+ ///
+ /// Returns a future that completes when all inner sinks' [StreamSink.close]
+ /// futures have completed. Note that a [StreamController]'s sink won't close
+ /// until the corresponding stream has a listener.
+ Future<void> disconnect() => _disconnectMemo.runOnce(() {
+ var futures = _sinks.map((sink) => sink._disconnect()).toList();
+ _sinks.clear();
+ return Future.wait(futures, eagerError: true);
+ });
+ final _disconnectMemo = AsyncMemoizer<List<void>>();
+
+ @override
+ StreamChannel<T> bind(StreamChannel<T> channel) {
+ return channel.changeSink((innerSink) {
+ var sink = _DisconnectorSink<T>(innerSink);
+
+ if (isDisconnected) {
+ // Ignore errors here, because otherwise there would be no way for the
+ // user to handle them gracefully.
+ sink._disconnect().catchError((_) {});
+ } else {
+ _sinks.add(sink);
+ }
+
+ return sink;
+ });
+ }
+}
+
+/// A sink wrapper that can force a disconnection.
+class _DisconnectorSink<T> implements StreamSink<T> {
+ /// The inner sink.
+ final StreamSink<T> _inner;
+
+ @override
+ Future<void> get done => _inner.done;
+
+ /// Whether [Disconnector.disconnect] has been called.
+ var _isDisconnected = false;
+
+ /// Whether the user has called [close].
+ var _closed = false;
+
+ /// The subscription to the stream passed to [addStream], if a stream is
+ /// currently being added.
+ StreamSubscription<T>? _addStreamSubscription;
+
+ /// The completer for the future returned by [addStream], if a stream is
+ /// currently being added.
+ Completer? _addStreamCompleter;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool get _inAddStream => _addStreamSubscription != null;
+
+ _DisconnectorSink(this._inner);
+
+ @override
+ void add(T data) {
+ if (_closed) throw StateError('Cannot add event after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add event while adding stream.');
+ }
+ if (_isDisconnected) return;
+
+ _inner.add(data);
+ }
+
+ @override
+ void addError(Object error, [StackTrace? stackTrace]) {
+ if (_closed) throw StateError('Cannot add event after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add event while adding stream.');
+ }
+ if (_isDisconnected) return;
+
+ _inner.addError(error, stackTrace);
+ }
+
+ @override
+ Future<void> addStream(Stream<T> stream) {
+ if (_closed) throw StateError('Cannot add stream after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add stream while adding stream.');
+ }
+ if (_isDisconnected) return Future.value();
+
+ _addStreamCompleter = Completer.sync();
+ _addStreamSubscription = stream.listen(_inner.add,
+ onError: _inner.addError, onDone: _addStreamCompleter!.complete);
+ return _addStreamCompleter!.future.then((_) {
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ });
+ }
+
+ @override
+ Future<void> close() {
+ if (_inAddStream) {
+ throw StateError('Cannot close sink while adding stream.');
+ }
+
+ _closed = true;
+ return _inner.close();
+ }
+
+ /// Disconnects this sink.
+ ///
+ /// This closes the underlying sink and stops forwarding events. It returns
+ /// the [StreamSink.close] future for the underlying sink.
+ Future<void> _disconnect() {
+ _isDisconnected = true;
+ var future = _inner.close();
+
+ if (_inAddStream) {
+ _addStreamCompleter!.complete(_addStreamSubscription!.cancel());
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ }
+
+ return future;
+ }
+}
diff --git a/pkgs/stream_channel/lib/src/guarantee_channel.dart b/pkgs/stream_channel/lib/src/guarantee_channel.dart
new file mode 100644
index 0000000..30ebe2e
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/guarantee_channel.dart
@@ -0,0 +1,207 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that enforces the stream channel guarantees.
+///
+/// This is exposed via [StreamChannel.withGuarantees].
+class GuaranteeChannel<T> extends StreamChannelMixin<T> {
+ @override
+ Stream<T> get stream => _streamController.stream;
+
+ @override
+ StreamSink<T> get sink => _sink;
+ late final _GuaranteeSink<T> _sink;
+
+ /// The controller for [stream].
+ ///
+ /// This intermediate controller allows us to continue listening for a done
+ /// event even after the user has canceled their subscription, and to send our
+ /// own done event when the sink is closed.
+ late final StreamController<T> _streamController;
+
+ /// The subscription to the inner stream.
+ StreamSubscription<T>? _subscription;
+
+ /// Whether the sink has closed, causing the underlying channel to disconnect.
+ bool _disconnected = false;
+
+ GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
+ {bool allowSinkErrors = true}) {
+ _sink = _GuaranteeSink<T>(innerSink, this, allowErrors: allowSinkErrors);
+
+ // Enforce the single-subscription guarantee by changing a broadcast stream
+ // to single-subscription.
+ if (innerStream.isBroadcast) {
+ innerStream =
+ innerStream.transform(SingleSubscriptionTransformer<T, T>());
+ }
+
+ _streamController = StreamController<T>(
+ onListen: () {
+ // If the sink has disconnected, we've already called
+ // [_streamController.close].
+ if (_disconnected) return;
+
+ _subscription = innerStream.listen(_streamController.add,
+ onError: _streamController.addError, onDone: () {
+ _sink._onStreamDisconnected();
+ _streamController.close();
+ });
+ },
+ sync: true);
+ }
+
+ /// Called by [_GuaranteeSink] when the user closes it.
+ ///
+ /// The sink closing indicates that the connection is closed, so the stream
+ /// should stop emitting events.
+ void _onSinkDisconnected() {
+ _disconnected = true;
+ var subscription = _subscription;
+ if (subscription != null) subscription.cancel();
+ _streamController.close();
+ }
+}
+
+/// The sink for [GuaranteeChannel].
+///
+/// This wraps the inner sink to ignore events and cancel any in-progress
+/// [addStream] calls when the underlying channel closes.
+class _GuaranteeSink<T> implements StreamSink<T> {
+ /// The inner sink being wrapped.
+ final StreamSink<T> _inner;
+
+ /// The [GuaranteeChannel] this belongs to.
+ final GuaranteeChannel<T> _channel;
+
+ @override
+ Future<void> get done => _doneCompleter.future;
+ final _doneCompleter = Completer<void>();
+
+ /// Whether connection is disconnected.
+ ///
+ /// This can happen because the stream has emitted a done event, or because
+ /// the user added an error when [_allowErrors] is `false`.
+ bool _disconnected = false;
+
+ /// Whether the user has called [close].
+ bool _closed = false;
+
+ /// The subscription to the stream passed to [addStream], if a stream is
+ /// currently being added.
+ StreamSubscription<T>? _addStreamSubscription;
+
+ /// The completer for the future returned by [addStream], if a stream is
+ /// currently being added.
+ Completer? _addStreamCompleter;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool get _inAddStream => _addStreamSubscription != null;
+
+ /// Whether errors are passed on to the underlying sink.
+ ///
+ /// If this is `false`, any error passed to the sink is piped to [done] and
+ /// the underlying sink is closed.
+ final bool _allowErrors;
+
+ _GuaranteeSink(this._inner, this._channel, {bool allowErrors = true})
+ : _allowErrors = allowErrors;
+
+ @override
+ void add(T data) {
+ if (_closed) throw StateError('Cannot add event after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add event while adding stream.');
+ }
+ if (_disconnected) return;
+
+ _inner.add(data);
+ }
+
+ @override
+ void addError(Object error, [StackTrace? stackTrace]) {
+ if (_closed) throw StateError('Cannot add event after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add event while adding stream.');
+ }
+ if (_disconnected) return;
+
+ _addError(error, stackTrace);
+ }
+
+ /// Like [addError], but doesn't check to ensure that an error can be added.
+ ///
+ /// This is called from [addStream], so it shouldn't fail if a stream is being
+ /// added.
+ void _addError(Object error, [StackTrace? stackTrace]) {
+ if (_allowErrors) {
+ _inner.addError(error, stackTrace);
+ return;
+ }
+
+ _doneCompleter.completeError(error, stackTrace);
+
+ // Treat an error like both the stream and sink disconnecting.
+ _onStreamDisconnected();
+ _channel._onSinkDisconnected();
+
+ // Ignore errors from the inner sink. We're already surfacing one error, and
+ // if the user handles it we don't want them to have another top-level.
+ _inner.close().catchError((_) {});
+ }
+
+ @override
+ Future<void> addStream(Stream<T> stream) {
+ if (_closed) throw StateError('Cannot add stream after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add stream while adding stream.');
+ }
+ if (_disconnected) return Future.value();
+
+ _addStreamCompleter = Completer.sync();
+ _addStreamSubscription = stream.listen(_inner.add,
+ onError: _addError, onDone: _addStreamCompleter!.complete);
+ return _addStreamCompleter!.future.then((_) {
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ });
+ }
+
+ @override
+ Future<void> close() {
+ if (_inAddStream) {
+ throw StateError('Cannot close sink while adding stream.');
+ }
+
+ if (_closed) return done;
+ _closed = true;
+
+ if (!_disconnected) {
+ _channel._onSinkDisconnected();
+ _doneCompleter.complete(_inner.close());
+ }
+
+ return done;
+ }
+
+ /// Called by [GuaranteeChannel] when the stream emits a done event.
+ ///
+ /// The stream being done indicates that the connection is closed, so the
+ /// sink should stop forwarding events.
+ void _onStreamDisconnected() {
+ _disconnected = true;
+ if (!_doneCompleter.isCompleted) _doneCompleter.complete();
+
+ if (!_inAddStream) return;
+ _addStreamCompleter!.complete(_addStreamSubscription!.cancel());
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ }
+}
diff --git a/pkgs/stream_channel/lib/src/isolate_channel.dart b/pkgs/stream_channel/lib/src/isolate_channel.dart
new file mode 100644
index 0000000..15c68a4
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/isolate_channel.dart
@@ -0,0 +1,115 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
+/// presumably with another isolate.
+///
+/// The remote endpoint doesn't necessarily need to be running an
+/// [IsolateChannel]. This can be used with any two ports, although the
+/// [StreamChannel] semantics mean that this class will treat them as being
+/// paired (for example, closing the [sink] will cause the [stream] to stop
+/// emitting events).
+///
+/// The underlying isolate ports have no notion of closing connections. This
+/// means that [stream] won't close unless [sink] is closed, and that closing
+/// [sink] won't cause the remote endpoint to close. Users should take care to
+/// ensure that they always close the [sink] of every [IsolateChannel] they use
+/// to avoid leaving dangling [ReceivePort]s.
+class IsolateChannel<T> extends StreamChannelMixin<T> {
+ @override
+ final Stream<T> stream;
+ @override
+ final StreamSink<T> sink;
+
+ /// Connects to a remote channel that was created with
+ /// [IsolateChannel.connectSend].
+ ///
+ /// These constructors establish a connection using only a single
+ /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+ /// connect constructors.
+ ///
+ /// The connection protocol is guaranteed to remain compatible across versions
+ /// at least until the next major version release. If the protocol is
+ /// violated, the resulting channel will emit a single value on its stream and
+ /// then close.
+ factory IsolateChannel.connectReceive(ReceivePort receivePort) {
+ // We can't use a [StreamChannelCompleter] here because we need the return
+ // value to be an [IsolateChannel].
+ var isCompleted = false;
+ var streamCompleter = StreamCompleter<T>();
+ var sinkCompleter = StreamSinkCompleter<T>();
+
+ var channel = IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink
+ .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) {
+ if (!isCompleted) {
+ receivePort.close();
+ streamCompleter.setSourceStream(const Stream.empty());
+ sinkCompleter.setDestinationSink(NullStreamSink<T>());
+ }
+ sink.close();
+ })));
+
+ // The first message across the ReceivePort should be a SendPort pointing to
+ // the remote end. If it's not, we'll make the stream emit an error
+ // complaining.
+ late StreamSubscription<dynamic> subscription;
+ subscription = receivePort.listen((message) {
+ isCompleted = true;
+ if (message is SendPort) {
+ var controller =
+ StreamChannelController<T>(allowForeignErrors: false, sync: true);
+ SubscriptionStream(subscription).cast<T>().pipe(controller.local.sink);
+ controller.local.stream
+ .listen((data) => message.send(data), onDone: receivePort.close);
+
+ streamCompleter.setSourceStream(controller.foreign.stream);
+ sinkCompleter.setDestinationSink(controller.foreign.sink);
+ return;
+ }
+
+ streamCompleter.setError(
+ StateError('Unexpected Isolate response "$message".'),
+ StackTrace.current);
+ sinkCompleter.setDestinationSink(NullStreamSink<T>());
+ subscription.cancel();
+ });
+
+ return channel;
+ }
+
+ /// Connects to a remote channel that was created with
+ /// [IsolateChannel.connectReceive].
+ ///
+ /// These constructors establish a connection using only a single
+ /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+ /// connect constructors.
+ ///
+ /// The connection protocol is guaranteed to remain compatible across versions
+ /// at least until the next major version release.
+ factory IsolateChannel.connectSend(SendPort sendPort) {
+ var receivePort = ReceivePort();
+ sendPort.send(receivePort.sendPort);
+ return IsolateChannel(receivePort, sendPort);
+ }
+
+ /// Creates a stream channel that receives messages from [receivePort] and
+ /// sends them over [sendPort].
+ factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
+ var controller =
+ StreamChannelController<T>(allowForeignErrors: false, sync: true);
+ receivePort.cast<T>().pipe(controller.local.sink);
+ controller.local.stream
+ .listen((data) => sendPort.send(data), onDone: receivePort.close);
+ return IsolateChannel._(controller.foreign.stream, controller.foreign.sink);
+ }
+
+ IsolateChannel._(this.stream, this.sink);
+}
diff --git a/pkgs/stream_channel/lib/src/json_document_transformer.dart b/pkgs/stream_channel/lib/src/json_document_transformer.dart
new file mode 100644
index 0000000..3feda43
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/json_document_transformer.dart
@@ -0,0 +1,35 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:convert';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannelTransformer] that transforms JSON documents—strings that
+/// contain individual objects encoded as JSON—into decoded Dart objects.
+///
+/// This decodes JSON that's emitted by the transformed channel's stream, and
+/// encodes objects so that JSON is passed to the transformed channel's sink.
+///
+/// If the transformed channel emits invalid JSON, this emits a
+/// [FormatException]. If an unencodable object is added to the sink, it
+/// synchronously throws a [JsonUnsupportedObjectError].
+final StreamChannelTransformer<Object?, String> jsonDocument =
+ const _JsonDocument();
+
+class _JsonDocument implements StreamChannelTransformer<Object?, String> {
+ const _JsonDocument();
+
+ @override
+ StreamChannel<Object?> bind(StreamChannel<String> channel) {
+ var stream = channel.stream.map(jsonDecode);
+ var sink = StreamSinkTransformer<Object, String>.fromHandlers(
+ handleData: (data, sink) {
+ sink.add(jsonEncode(data));
+ }).bind(channel.sink);
+ return StreamChannel.withCloseGuarantee(stream, sink);
+ }
+}
diff --git a/pkgs/stream_channel/lib/src/multi_channel.dart b/pkgs/stream_channel/lib/src/multi_channel.dart
new file mode 100644
index 0000000..4894239
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/multi_channel.dart
@@ -0,0 +1,274 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A class that multiplexes multiple virtual channels across a single
+/// underlying transport layer.
+///
+/// This should be connected to another [MultiChannel] on the other end of the
+/// underlying channel. It starts with a single default virtual channel,
+/// accessible via [stream] and [sink]. Additional virtual channels can be
+/// created with [virtualChannel].
+///
+/// When a virtual channel is created by one endpoint, the other must connect to
+/// it before messages may be sent through it. The first endpoint passes its
+/// [VirtualChannel.id] to the second, which then creates a channel from that id
+/// also using [virtualChannel]. For example:
+///
+/// ```dart
+/// // First endpoint
+/// var virtual = multiChannel.virtualChannel();
+/// multiChannel.sink.add({
+/// "channel": virtual.id
+/// });
+///
+/// // Second endpoint
+/// multiChannel.stream.listen((message) {
+/// var virtual = multiChannel.virtualChannel(message["channel"]);
+/// // ...
+/// });
+/// ```
+///
+/// Sending errors across a [MultiChannel] is not supported. Any errors from the
+/// underlying stream will be reported only via the default
+/// [MultiChannel.stream].
+///
+/// Each virtual channel may be closed individually. When all of them are
+/// closed, the underlying [StreamSink] is closed automatically.
+abstract class MultiChannel<T> implements StreamChannel<T> {
+ /// The default input stream.
+ ///
+ /// This connects to the remote [sink].
+ @override
+ Stream<T> get stream;
+
+ /// The default output stream.
+ ///
+ /// This connects to the remote [stream]. If this is closed, the remote
+ /// [stream] will close, but other virtual channels will remain open and new
+ /// virtual channels may be opened.
+ @override
+ StreamSink<T> get sink;
+
+ /// Creates a new [MultiChannel] that sends and receives messages over
+ /// [inner].
+ ///
+ /// The inner channel must take JSON-like objects.
+ factory MultiChannel(StreamChannel<dynamic> inner) => _MultiChannel<T>(inner);
+
+ /// Creates a new virtual channel.
+ ///
+ /// If [id] is not passed, this creates a virtual channel from scratch. Before
+ /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
+ /// where [virtualChannel] should be called with that id.
+ ///
+ /// If [id] is passed, this creates a virtual channel corresponding to the
+ /// channel with that id on the remote channel.
+ ///
+ /// Throws an [ArgumentError] if a virtual channel already exists for [id].
+ /// Throws a [StateError] if the underlying channel is closed.
+ VirtualChannel<T> virtualChannel([int? id]);
+}
+
+/// The implementation of [MultiChannel].
+///
+/// This is private so that [VirtualChannel] can inherit from [MultiChannel]
+/// without having to implement all the private members.
+class _MultiChannel<T> extends StreamChannelMixin<T>
+ implements MultiChannel<T> {
+ /// The inner channel over which all communication is conducted.
+ ///
+ /// This will be `null` if the underlying communication channel is closed.
+ StreamChannel<dynamic>? _inner;
+
+ /// The subscription to [_inner].stream.
+ StreamSubscription<dynamic>? _innerStreamSubscription;
+
+ @override
+ Stream<T> get stream => _mainController.foreign.stream;
+ @override
+ StreamSink<T> get sink => _mainController.foreign.sink;
+
+ /// The controller for this channel.
+ final _mainController = StreamChannelController<T>(sync: true);
+
+ /// A map from input IDs to [StreamChannelController]s that should be used to
+ /// communicate over those channels.
+ final _controllers = <int, StreamChannelController<T>>{};
+
+ /// Input IDs of controllers in [_controllers] that we've received messages
+ /// for but that have not yet had a local [virtualChannel] created.
+ final _pendingIds = <int>{};
+
+ /// Input IDs of virtual channels that used to exist but have since been
+ /// closed.
+ final _closedIds = <int>{};
+
+ /// The next id to use for a local virtual channel.
+ ///
+ /// Ids are used to identify virtual channels. Each message is tagged with an
+ /// id; the receiving [MultiChannel] uses this id to look up which
+ /// [VirtualChannel] the message should be dispatched to.
+ ///
+ /// The id scheme for virtual channels is somewhat complicated. This is
+ /// necessary to ensure that there are no conflicts even when both endpoints
+ /// have virtual channels with the same id; since both endpoints can send and
+ /// receive messages across each virtual channel, a naïve scheme would make it
+ /// impossible to tell whether a message was from a channel that originated in
+ /// the remote endpoint or a reply on a channel that originated in the local
+ /// endpoint.
+ ///
+ /// The trick is that each endpoint only uses odd ids for its own channels.
+ /// When sending a message over a channel that was created by the remote
+ /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
+ /// knows that if an incoming message has an odd id, it's coming from a
+ /// channel that was originally created remotely, but if it has an even id,
+ /// it's coming from a channel that was originally created locally.
+ var _nextId = 1;
+
+ _MultiChannel(StreamChannel<dynamic> inner) : _inner = inner {
+ // The default connection is a special case which has id 0 on both ends.
+ // This allows it to begin connected without having to send over an id.
+ _controllers[0] = _mainController;
+ _mainController.local.stream.listen(
+ (message) => _inner!.sink.add(<Object?>[0, message]),
+ onDone: () => _closeChannel(0, 0));
+
+ _innerStreamSubscription = _inner!.stream.cast<List>().listen((message) {
+ var id = (message[0] as num).toInt();
+
+ // If the channel was closed before an incoming message was processed,
+ // ignore that message.
+ if (_closedIds.contains(id)) return;
+
+ var controller = _controllers.putIfAbsent(id, () {
+ // If we receive a message for a controller that doesn't have a local
+ // counterpart yet, create a controller for it to buffer incoming
+ // messages for when a local connection is created.
+ _pendingIds.add(id);
+ return StreamChannelController(sync: true);
+ });
+
+ if (message.length > 1) {
+ controller.local.sink.add(message[1] as T);
+ } else {
+ // A message without data indicates that the channel has been closed. We
+ // can just close the sink here without doing any more cleanup, because
+ // the sink closing will cause the stream to emit a done event which
+ // will trigger more cleanup.
+ controller.local.sink.close();
+ }
+ },
+ onDone: _closeInnerChannel,
+ onError: _mainController.local.sink.addError);
+ }
+
+ @override
+ VirtualChannel<T> virtualChannel([int? id]) {
+ int inputId;
+ int outputId;
+ if (id != null) {
+ // Since the user is passing in an id, we're connected to a remote
+ // VirtualChannel. This means messages they send over this channel will
+ // have the original odd id, but our replies will have an even id.
+ inputId = id;
+ outputId = id + 1;
+ } else {
+ // Since we're generating an id, we originated this VirtualChannel. This
+ // means messages we send over this channel will have the original odd id,
+ // but the remote channel's replies will have an even id.
+ inputId = _nextId + 1;
+ outputId = _nextId;
+ _nextId += 2;
+ }
+
+ // If the inner channel has already closed, create new virtual channels in a
+ // closed state.
+ if (_inner == null) {
+ return VirtualChannel._(
+ this, inputId, const Stream.empty(), NullStreamSink());
+ }
+
+ late StreamChannelController<T> controller;
+ if (_pendingIds.remove(inputId)) {
+ // If we've already received messages for this channel, use the controller
+ // where those messages are buffered.
+ controller = _controllers[inputId]!;
+ } else if (_controllers.containsKey(inputId) ||
+ _closedIds.contains(inputId)) {
+ throw ArgumentError('A virtual channel with id $id already exists.');
+ } else {
+ controller = StreamChannelController(sync: true);
+ _controllers[inputId] = controller;
+ }
+
+ controller.local.stream.listen(
+ (message) => _inner!.sink.add(<Object?>[outputId, message]),
+ onDone: () => _closeChannel(inputId, outputId));
+ return VirtualChannel._(
+ this, outputId, controller.foreign.stream, controller.foreign.sink);
+ }
+
+ /// Closes the virtual channel for which incoming messages have [inputId] and
+ /// outgoing messages have [outputId].
+ void _closeChannel(int inputId, int outputId) {
+ _closedIds.add(inputId);
+ var controller = _controllers.remove(inputId)!;
+ controller.local.sink.close();
+
+ if (_inner == null) return;
+
+ // A message without data indicates that the virtual channel has been
+ // closed.
+ _inner!.sink.add([outputId]);
+ if (_controllers.isEmpty) _closeInnerChannel();
+ }
+
+ /// Closes the underlying communication channel.
+ void _closeInnerChannel() {
+ _inner!.sink.close();
+ _innerStreamSubscription!.cancel();
+ _inner = null;
+
+ // Convert this to a list because the close is dispatched synchronously, and
+ // that could conceivably remove a controller from [_controllers].
+ for (var controller in _controllers.values.toList(growable: false)) {
+ controller.local.sink.close();
+ }
+ _controllers.clear();
+ }
+}
+
+/// A virtual channel created by [MultiChannel].
+///
+/// This implements [MultiChannel] for convenience.
+/// [VirtualChannel.virtualChannel] is semantically identical to the parent's
+/// [MultiChannel.virtualChannel].
+class VirtualChannel<T> extends StreamChannelMixin<T>
+ implements MultiChannel<T> {
+ /// The [MultiChannel] that created this.
+ final MultiChannel<T> _parent;
+
+ /// The identifier for this channel.
+ ///
+ /// This can be sent across the [MultiChannel] to provide the remote endpoint
+ /// a means to connect to this channel. Nothing about this is guaranteed
+ /// except that it will be JSON-serializable.
+ final int id;
+
+ @override
+ final Stream<T> stream;
+ @override
+ final StreamSink<T> sink;
+
+ VirtualChannel._(this._parent, this.id, this.stream, this.sink);
+
+ @override
+ VirtualChannel<T> virtualChannel([int? id]) => _parent.virtualChannel(id);
+}
diff --git a/pkgs/stream_channel/lib/src/stream_channel_completer.dart b/pkgs/stream_channel/lib/src/stream_channel_completer.dart
new file mode 100644
index 0000000..9d007eb
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/stream_channel_completer.dart
@@ -0,0 +1,74 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [channel] where the source and destination are provided later.
+///
+/// The [channel] is a normal channel that can be listened to and that events
+/// can be added to immediately, but until [setChannel] is called it won't emit
+/// any events and all events added to it will be buffered.
+class StreamChannelCompleter<T> {
+ /// The completer for this channel's stream.
+ final _streamCompleter = StreamCompleter<T>();
+
+ /// The completer for this channel's sink.
+ final _sinkCompleter = StreamSinkCompleter<T>();
+
+ /// The channel for this completer.
+ StreamChannel<T> get channel => _channel;
+ late final StreamChannel<T> _channel;
+
+ /// Whether [setChannel] has been called.
+ bool _set = false;
+
+ /// Convert a `Future<StreamChannel>` to a `StreamChannel`.
+ ///
+ /// This creates a channel using a channel completer, and sets the source
+ /// channel to the result of the future when the future completes.
+ ///
+ /// If the future completes with an error, the returned channel's stream will
+ /// instead contain just that error. The sink will silently discard all
+ /// events.
+ static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
+ var completer = StreamChannelCompleter<void>();
+ channelFuture.then(completer.setChannel, onError: completer.setError);
+ return completer.channel;
+ }
+
+ StreamChannelCompleter() {
+ _channel = StreamChannel<T>(_streamCompleter.stream, _sinkCompleter.sink);
+ }
+
+ /// Set a channel as the source and destination for [channel].
+ ///
+ /// A channel may be set at most once.
+ ///
+ /// Either [setChannel] or [setError] may be called at most once. Trying to
+ /// call either of them again will fail.
+ void setChannel(StreamChannel<T> channel) {
+ if (_set) throw StateError('The channel has already been set.');
+ _set = true;
+
+ _streamCompleter.setSourceStream(channel.stream);
+ _sinkCompleter.setDestinationSink(channel.sink);
+ }
+
+ /// Indicates that there was an error connecting the channel.
+ ///
+ /// This makes the stream emit [error] and close. It makes the sink discard
+ /// all its events.
+ ///
+ /// Either [setChannel] or [setError] may be called at most once. Trying to
+ /// call either of them again will fail.
+ void setError(Object error, [StackTrace? stackTrace]) {
+ if (_set) throw StateError('The channel has already been set.');
+ _set = true;
+
+ _streamCompleter.setError(error, stackTrace);
+ _sinkCompleter.setDestinationSink(NullStreamSink());
+ }
+}
diff --git a/pkgs/stream_channel/lib/src/stream_channel_controller.dart b/pkgs/stream_channel/lib/src/stream_channel_controller.dart
new file mode 100644
index 0000000..25d5239
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/stream_channel_controller.dart
@@ -0,0 +1,67 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// @docImport 'isolate_channel.dart';
+library;
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A controller for exposing a new [StreamChannel].
+///
+/// This exposes two connected [StreamChannel]s, [local] and [foreign]. The
+/// user's code should use [local] to emit and receive events. Then [foreign]
+/// can be returned for others to use. For example, here's a simplified version
+/// of the implementation of [IsolateChannel.new]:
+///
+/// ```dart
+/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
+/// var controller = new StreamChannelController(allowForeignErrors: false);
+///
+/// // Pipe all events from the receive port into the local sink...
+/// receivePort.pipe(controller.local.sink);
+///
+/// // ...and all events from the local stream into the send port.
+/// controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
+///
+/// // Then return the foreign controller for your users to use.
+/// return controller.foreign;
+/// }
+/// ```
+class StreamChannelController<T> {
+ /// The local channel.
+ ///
+ /// This channel should be used directly by the creator of this
+ /// [StreamChannelController] to send and receive events.
+ StreamChannel<T> get local => _local;
+ late final StreamChannel<T> _local;
+
+ /// The foreign channel.
+ ///
+ /// This channel should be returned to external users so they can communicate
+ /// with [local].
+ StreamChannel<T> get foreign => _foreign;
+ late final StreamChannel<T> _foreign;
+
+ /// Creates a [StreamChannelController].
+ ///
+ /// If [sync] is true, events added to either channel's sink are synchronously
+ /// dispatched to the other channel's stream. This should only be done if the
+ /// source of those events is already asynchronous.
+ ///
+ /// If [allowForeignErrors] is `false`, errors are not allowed to be passed to
+ /// the foreign channel's sink. If any are, the connection will close and the
+ /// error will be forwarded to the foreign channel's [StreamSink.done] future.
+ /// This guarantees that the local stream will never emit errors.
+ StreamChannelController({bool allowForeignErrors = true, bool sync = false}) {
+ var localToForeignController = StreamController<T>(sync: sync);
+ var foreignToLocalController = StreamController<T>(sync: sync);
+ _local = StreamChannel<T>.withGuarantees(
+ foreignToLocalController.stream, localToForeignController.sink);
+ _foreign = StreamChannel<T>.withGuarantees(
+ localToForeignController.stream, foreignToLocalController.sink,
+ allowSinkErrors: allowForeignErrors);
+ }
+}
diff --git a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
new file mode 100644
index 0000000..cf62c76
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannelTransformer] transforms the events being passed to and
+/// emitted by a [StreamChannel].
+///
+/// This works on the same principle as [StreamTransformer] and
+/// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes
+/// in the original [StreamChannel] and returns the transformed version.
+///
+/// Transformers must be able to have [bind] called multiple times. If a
+/// subclass implements [bind] explicitly, it should be sure that the returned
+/// stream follows the second stream channel guarantee: closing the sink causes
+/// the stream to close before it emits any more events. This guarantee is
+/// invalidated when an asynchronous gap is added between the original stream's
+/// event dispatch and the returned stream's, for example by transforming it
+/// with a [StreamTransformer]. The guarantee can be easily preserved using
+/// [StreamChannel.withCloseGuarantee].
+class StreamChannelTransformer<S, T> {
+ /// The transformer to use on the channel's stream.
+ final StreamTransformer<T, S> _streamTransformer;
+
+ /// The transformer to use on the channel's sink.
+ final StreamSinkTransformer<S, T> _sinkTransformer;
+
+ /// Creates a [StreamChannelTransformer] from existing stream and sink
+ /// transformers.
+ const StreamChannelTransformer(
+ this._streamTransformer, this._sinkTransformer);
+
+ /// Creates a [StreamChannelTransformer] from a codec's encoder and decoder.
+ ///
+ /// All input to the inner channel's sink is encoded using [Codec.encoder],
+ /// and all output from its stream is decoded using [Codec.decoder].
+ StreamChannelTransformer.fromCodec(Codec<S, T> codec)
+ : this(codec.decoder,
+ StreamSinkTransformer.fromStreamTransformer(codec.encoder));
+
+ /// Transforms the events sent to and emitted by [channel].
+ ///
+ /// Creates a new channel. When events are passed to the returned channel's
+ /// sink, the transformer will transform them and pass the transformed
+ /// versions to `channel.sink`. When events are emitted from the
+ /// `channel.straem`, the transformer will transform them and pass the
+ /// transformed versions to the returned channel's stream.
+ StreamChannel<S> bind(StreamChannel<T> channel) =>
+ StreamChannel<S>.withCloseGuarantee(
+ channel.stream.transform(_streamTransformer),
+ _sinkTransformer.bind(channel.sink));
+}
diff --git a/pkgs/stream_channel/lib/stream_channel.dart b/pkgs/stream_channel/lib/stream_channel.dart
new file mode 100644
index 0000000..85f9a97
--- /dev/null
+++ b/pkgs/stream_channel/lib/stream_channel.dart
@@ -0,0 +1,181 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import 'src/close_guarantee_channel.dart';
+import 'src/guarantee_channel.dart';
+import 'src/stream_channel_transformer.dart';
+
+export 'src/delegating_stream_channel.dart';
+export 'src/disconnector.dart';
+export 'src/json_document_transformer.dart';
+export 'src/multi_channel.dart';
+export 'src/stream_channel_completer.dart';
+export 'src/stream_channel_controller.dart';
+export 'src/stream_channel_transformer.dart';
+
+/// An abstract class representing a two-way communication channel.
+///
+/// Users should consider the [stream] emitting a "done" event to be the
+/// canonical indicator that the channel has closed. If they wish to close the
+/// channel, they should close the [sink]—canceling the stream subscription is
+/// not sufficient. Protocol errors may be emitted through the stream or through
+/// [sink].done, depending on their underlying cause. Note that the sink may
+/// silently drop events if the channel closes before [sink].close is called.
+///
+/// Implementations are strongly encouraged to mix in or extend
+/// [StreamChannelMixin] to get default implementations of the various instance
+/// methods. Adding new methods to this interface will not be considered a
+/// breaking change if implementations are also added to [StreamChannelMixin].
+///
+/// Implementations must provide the following guarantees:
+///
+/// * The stream is single-subscription, and must follow all the guarantees of
+/// single-subscription streams.
+///
+/// * Closing the sink causes the stream to close before it emits any more
+/// events.
+///
+/// * After the stream closes, the sink is automatically closed. If this
+/// happens, sink methods should silently drop their arguments until
+/// [sink].close is called.
+///
+/// * If the stream closes before it has a listener, the sink should silently
+/// drop events if possible.
+///
+/// * Canceling the stream's subscription has no effect on the sink. The channel
+/// must still be able to respond to the other endpoint closing the channel
+/// even after the subscription has been canceled.
+///
+/// * The sink *either* forwards errors to the other endpoint *or* closes as
+/// soon as an error is added and forwards that error to the [sink].done
+/// future.
+///
+/// These guarantees allow users to interact uniformly with all implementations,
+/// and ensure that either endpoint closing the stream produces consistent
+/// behavior.
+abstract class StreamChannel<T> {
+ /// The single-subscription stream that emits values from the other endpoint.
+ Stream<T> get stream;
+
+ /// The sink for sending values to the other endpoint.
+ StreamSink<T> get sink;
+
+ /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+ ///
+ /// Note that this stream/sink pair must provide the guarantees listed in the
+ /// [StreamChannel] documentation. If they don't do so natively,
+ /// [StreamChannel.withGuarantees] should be used instead.
+ factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
+ _StreamChannel<T>(stream, sink);
+
+ /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+ ///
+ /// Unlike [StreamChannel.new], this enforces the guarantees listed in the
+ /// [StreamChannel] documentation. This makes it somewhat less efficient than
+ /// just wrapping a stream and a sink directly, so [StreamChannel.new] should
+ /// be used when the guarantees are provided natively.
+ ///
+ /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to
+ /// [sink]. If any are, the connection will close and the error will be
+ /// forwarded to [sink].done.
+ factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
+ {bool allowSinkErrors = true}) =>
+ GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
+
+ /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+ ///
+ /// This specifically enforces the second guarantee: closing the sink causes
+ /// the stream to close before it emits any more events. This guarantee is
+ /// invalidated when an asynchronous gap is added between the original
+ /// stream's event dispatch and the returned stream's, for example by
+ /// transforming it with a [StreamTransformer]. This is a lighter-weight way
+ /// of preserving that guarantee in particular than
+ /// [StreamChannel.withGuarantees].
+ factory StreamChannel.withCloseGuarantee(
+ Stream<T> stream, StreamSink<T> sink) =>
+ CloseGuaranteeChannel(stream, sink);
+
+ /// Connects this to [other], so that any values emitted by either are sent
+ /// directly to the other.
+ void pipe(StreamChannel<T> other);
+
+ /// Transforms this using [transformer].
+ ///
+ /// This is identical to calling `transformer.bind(channel)`.
+ StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);
+
+ /// Transforms only the [stream] component of this using [transformer].
+ StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
+
+ /// Transforms only the [sink] component of this using [transformer].
+ StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
+
+ /// Returns a copy of this with [stream] replaced by [change]'s return
+ /// value.
+ StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change);
+
+ /// Returns a copy of this with [sink] replaced by [change]'s return
+ /// value.
+ StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change);
+
+ /// Returns a copy of this with the generic type coerced to [S].
+ ///
+ /// If any events emitted by [stream] aren't of type [S], they're converted
+ /// into [TypeError] events (`CastError` on some SDK versions). Similarly, if
+ /// any events are added to [sink] that aren't of type [S], a [TypeError] is
+ /// thrown.
+ StreamChannel<S> cast<S>();
+}
+
+/// An implementation of [StreamChannel] that simply takes a stream and a sink
+/// as parameters.
+///
+/// This is distinct from [StreamChannel] so that it can use
+/// [StreamChannelMixin].
+class _StreamChannel<T> extends StreamChannelMixin<T> {
+ @override
+ final Stream<T> stream;
+ @override
+ final StreamSink<T> sink;
+
+ _StreamChannel(this.stream, this.sink);
+}
+
+/// A mixin that implements the instance methods of [StreamChannel] in terms of
+/// [stream] and [sink].
+abstract class StreamChannelMixin<T> implements StreamChannel<T> {
+ @override
+ void pipe(StreamChannel<T> other) {
+ stream.pipe(other.sink);
+ other.stream.pipe(sink);
+ }
+
+ @override
+ StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) =>
+ transformer.bind(this);
+
+ @override
+ StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
+ changeStream(transformer.bind);
+
+ @override
+ StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
+ changeSink(transformer.bind);
+
+ @override
+ StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change) =>
+ StreamChannel.withCloseGuarantee(change(stream), sink);
+
+ @override
+ StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change) =>
+ StreamChannel.withCloseGuarantee(stream, change(sink));
+
+ @override
+ StreamChannel<S> cast<S>() => StreamChannel(
+ stream.cast(), StreamController(sync: true)..stream.cast<T>().pipe(sink));
+}
diff --git a/pkgs/stream_channel/pubspec.yaml b/pkgs/stream_channel/pubspec.yaml
new file mode 100644
index 0000000..eec8c1b
--- /dev/null
+++ b/pkgs/stream_channel/pubspec.yaml
@@ -0,0 +1,16 @@
+name: stream_channel
+version: 2.1.3
+description: >-
+ An abstraction for two-way communication channels based on the Dart Stream
+ class.
+repository: https://github.com/dart-lang/tools/tree/main/pkgs/stream_channel
+
+environment:
+ sdk: ^3.3.0
+
+dependencies:
+ async: ^2.5.0
+
+dev_dependencies:
+ dart_flutter_team_lints: ^3.0.0
+ test: ^1.16.6
diff --git a/pkgs/stream_channel/test/disconnector_test.dart b/pkgs/stream_channel/test/disconnector_test.dart
new file mode 100644
index 0000000..bbba568
--- /dev/null
+++ b/pkgs/stream_channel/test/disconnector_test.dart
@@ -0,0 +1,152 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamController streamController;
+ late StreamController sinkController;
+ late Disconnector disconnector;
+ late StreamChannel channel;
+ setUp(() {
+ streamController = StreamController<void>();
+ sinkController = StreamController<void>();
+ disconnector = Disconnector();
+ channel = StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink)
+ .transform(disconnector);
+ });
+
+ group('before disconnection', () {
+ test('forwards events from the sink as normal', () {
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ channel.sink.close();
+
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test('forwards events to the stream as normal', () {
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ streamController.close();
+
+ expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("events can't be added when the sink is explicitly closed", () {
+ sinkController.stream.listen(null); // Work around sdk#19095.
+
+ expect(channel.sink.close(), completes);
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ expect(() => channel.sink.addStream(Stream.fromIterable([])),
+ throwsStateError);
+ });
+
+ test("events can't be added while a stream is being added", () {
+ var controller = StreamController<void>();
+ channel.sink.addStream(controller.stream);
+
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ expect(() => channel.sink.addStream(Stream.fromIterable([])),
+ throwsStateError);
+ expect(() => channel.sink.close(), throwsStateError);
+
+ controller.close();
+ });
+ });
+
+ test('cancels addStream when disconnected', () async {
+ var canceled = false;
+ var controller = StreamController<void>(onCancel: () {
+ canceled = true;
+ });
+ expect(channel.sink.addStream(controller.stream), completes);
+ unawaited(disconnector.disconnect());
+
+ await pumpEventQueue();
+ expect(canceled, isTrue);
+ });
+
+ test('disconnect() returns the close future from the inner sink', () async {
+ var streamController = StreamController<void>();
+ var sinkController = StreamController<void>();
+ var disconnector = Disconnector<void>();
+ var sink = _CloseCompleterSink(sinkController.sink);
+ StreamChannel.withGuarantees(streamController.stream, sink)
+ .transform(disconnector);
+
+ var disconnectFutureFired = false;
+ expect(
+ disconnector.disconnect().then((_) {
+ disconnectFutureFired = true;
+ }),
+ completes);
+
+ // Give the future time to fire early if it's going to.
+ await pumpEventQueue();
+ expect(disconnectFutureFired, isFalse);
+
+ // When the inner sink's close future completes, so should the
+ // disconnector's.
+ sink.completer.complete();
+ await pumpEventQueue();
+ expect(disconnectFutureFired, isTrue);
+ });
+
+ group('after disconnection', () {
+ setUp(() {
+ disconnector.disconnect();
+ });
+
+ test('closes the inner sink and ignores events to the outer sink', () {
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ channel.sink.close();
+
+ expect(sinkController.stream.toList(), completion(isEmpty));
+ });
+
+ test('closes the stream', () {
+ expect(channel.stream.toList(), completion(isEmpty));
+ });
+
+ test('completes done', () {
+ sinkController.stream.listen(null); // Work around sdk#19095.
+ expect(channel.sink.done, completes);
+ });
+
+ test('still emits state errors after explicit close', () {
+ sinkController.stream.listen(null); // Work around sdk#19095.
+ expect(channel.sink.close(), completes);
+
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ });
+ });
+}
+
+/// A [StreamSink] wrapper that adds the ability to manually complete the Future
+/// returned by [close] using [completer].
+class _CloseCompleterSink extends DelegatingStreamSink {
+ /// The completer for the future returned by [close].
+ final completer = Completer<void>();
+
+ _CloseCompleterSink(super.inner);
+
+ @override
+ Future<void> close() {
+ super.close();
+ return completer.future;
+ }
+}
diff --git a/pkgs/stream_channel/test/isolate_channel_test.dart b/pkgs/stream_channel/test/isolate_channel_test.dart
new file mode 100644
index 0000000..3a8b42e
--- /dev/null
+++ b/pkgs/stream_channel/test/isolate_channel_test.dart
@@ -0,0 +1,174 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@TestOn('vm')
+library;
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:stream_channel/isolate_channel.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late ReceivePort receivePort;
+ late SendPort sendPort;
+ late StreamChannel channel;
+ setUp(() {
+ receivePort = ReceivePort();
+ var receivePortForSend = ReceivePort();
+ sendPort = receivePortForSend.sendPort;
+ channel = IsolateChannel(receivePortForSend, receivePort.sendPort);
+ });
+
+ tearDown(() {
+ receivePort.close();
+ channel.sink.close();
+ });
+
+ test('the channel can send messages', () {
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+
+ expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test('the channel can receive messages', () {
+ sendPort.send(1);
+ sendPort.send(2);
+ sendPort.send(3);
+
+ expect(channel.stream.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("events can't be added to an explicitly-closed sink", () {
+ expect(channel.sink.close(), completes);
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ expect(() => channel.sink.addStream(Stream.fromIterable([])),
+ throwsStateError);
+ });
+
+ test("events can't be added while a stream is being added", () {
+ var controller = StreamController<void>();
+ channel.sink.addStream(controller.stream);
+
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ expect(() => channel.sink.addStream(Stream.fromIterable([])),
+ throwsStateError);
+ expect(() => channel.sink.close(), throwsStateError);
+
+ controller.close();
+ });
+
+ group('stream channel rules', () {
+ test(
+ 'closing the sink causes the stream to close before it emits any more '
+ 'events', () {
+ sendPort.send(1);
+ sendPort.send(2);
+ sendPort.send(3);
+ sendPort.send(4);
+ sendPort.send(5);
+
+ channel.stream.listen(expectAsync1((message) {
+ expect(message, equals(1));
+ channel.sink.close();
+ }, count: 1));
+ });
+
+ test("cancelling the stream's subscription has no effect on the sink",
+ () async {
+ unawaited(channel.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test('the sink closes as soon as an error is added', () async {
+ channel.sink.addError('oh no');
+ channel.sink.add(1);
+ expect(channel.sink.done, throwsA('oh no'));
+
+ // Since the sink is closed, the stream should also be closed.
+ expect(channel.stream.isEmpty, completion(isTrue));
+
+ // The other end shouldn't receive the next event, since the sink was
+ // closed. Pump the event queue to give it a chance to.
+ receivePort.listen(expectAsync1((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test('the sink closes as soon as an error is added via addStream',
+ () async {
+ var canceled = false;
+ var controller = StreamController<void>(onCancel: () {
+ canceled = true;
+ });
+
+ // This future shouldn't get the error, because it's sent to [Sink.done].
+ expect(channel.sink.addStream(controller.stream), completes);
+
+ controller.addError('oh no');
+ expect(channel.sink.done, throwsA('oh no'));
+ await pumpEventQueue();
+ expect(canceled, isTrue);
+
+ // Even though the sink is closed, this shouldn't throw an error because
+ // the user didn't explicitly close it.
+ channel.sink.add(1);
+ });
+ });
+
+ group('connect constructors', () {
+ late ReceivePort connectPort;
+ setUp(() {
+ connectPort = ReceivePort();
+ });
+
+ tearDown(() {
+ connectPort.close();
+ });
+
+ test('create a connected pair of channels', () async {
+ var channel1 = IsolateChannel<int>.connectReceive(connectPort);
+ var channel2 = IsolateChannel<int>.connectSend(connectPort.sendPort);
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+ expect(await channel2.stream.take(3).toList(), equals([1, 2, 3]));
+
+ channel2.sink.add(4);
+ channel2.sink.add(5);
+ channel2.sink.add(6);
+ expect(await channel1.stream.take(3).toList(), equals([4, 5, 6]));
+
+ await channel2.sink.close();
+ });
+
+ test('the receiving channel produces an error if it gets the wrong message',
+ () {
+ var connectedChannel = IsolateChannel<int>.connectReceive(connectPort);
+ connectPort.sendPort.send('wrong value');
+
+ expect(connectedChannel.stream.toList(), throwsStateError);
+ expect(connectedChannel.sink.done, completes);
+ });
+
+ test('the receiving channel closes gracefully without a connection',
+ () async {
+ var connectedChannel = IsolateChannel<int>.connectReceive(connectPort);
+ await connectedChannel.sink.close();
+ await expectLater(connectedChannel.stream.toList(), completion(isEmpty));
+ await expectLater(connectedChannel.sink.done, completes);
+ });
+ });
+}
diff --git a/pkgs/stream_channel/test/json_document_transformer_test.dart b/pkgs/stream_channel/test/json_document_transformer_test.dart
new file mode 100644
index 0000000..290c4e2
--- /dev/null
+++ b/pkgs/stream_channel/test/json_document_transformer_test.dart
@@ -0,0 +1,46 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamController<String> streamController;
+ late StreamController<String> sinkController;
+ late StreamChannel<String> channel;
+ setUp(() {
+ streamController = StreamController<String>();
+ sinkController = StreamController<String>();
+ channel =
+ StreamChannel<String>(streamController.stream, sinkController.sink);
+ });
+
+ test('decodes JSON emitted by the channel', () {
+ var transformed = channel.transform(jsonDocument);
+ streamController.add('{"foo": "bar"}');
+ expect(transformed.stream.first, completion(equals({'foo': 'bar'})));
+ });
+
+ test('encodes objects added to the channel', () {
+ var transformed = channel.transform(jsonDocument);
+ transformed.sink.add({'foo': 'bar'});
+ expect(sinkController.stream.first,
+ completion(equals(jsonEncode({'foo': 'bar'}))));
+ });
+
+ test('emits a stream error when incoming JSON is malformed', () {
+ var transformed = channel.transform(jsonDocument);
+ streamController.add('{invalid');
+ expect(transformed.stream.first, throwsFormatException);
+ });
+
+ test('synchronously throws if an unencodable object is added', () {
+ var transformed = channel.transform(jsonDocument);
+ expect(() => transformed.sink.add(Object()),
+ throwsA(const TypeMatcher<JsonUnsupportedObjectError>()));
+ });
+}
diff --git a/pkgs/stream_channel/test/multi_channel_test.dart b/pkgs/stream_channel/test/multi_channel_test.dart
new file mode 100644
index 0000000..ee6f8d2
--- /dev/null
+++ b/pkgs/stream_channel/test/multi_channel_test.dart
@@ -0,0 +1,478 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamChannelController controller;
+ late MultiChannel channel1;
+ late MultiChannel channel2;
+ setUp(() {
+ controller = StreamChannelController();
+ channel1 = MultiChannel<int>(controller.local);
+ channel2 = MultiChannel<int>(controller.foreign);
+ });
+
+ group('the default virtual channel', () {
+ test('begins connected', () {
+ var first = true;
+ channel2.stream.listen(expectAsync1((message) {
+ if (first) {
+ expect(message, equals(1));
+ first = false;
+ } else {
+ expect(message, equals(2));
+ }
+ }, count: 2));
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ });
+
+ test('closes the remote virtual channel when it closes', () {
+ expect(channel2.stream.toList(), completion(isEmpty));
+ expect(channel2.sink.done, completes);
+
+ channel1.sink.close();
+ });
+
+ test('closes the local virtual channel when it closes', () {
+ expect(channel1.stream.toList(), completion(isEmpty));
+ expect(channel1.sink.done, completes);
+
+ channel1.sink.close();
+ });
+
+ test(
+ "doesn't closes the local virtual channel when the stream "
+ 'subscription is canceled', () {
+ channel1.sink.done.then(expectAsync1((_) {}, count: 0));
+
+ channel1.stream.listen((_) {}).cancel();
+
+ // Ensure that there's enough time for the channel to close if it's going
+ // to.
+ return pumpEventQueue();
+ });
+
+ test(
+ 'closes the underlying channel when it closes without any other '
+ 'virtual channels', () {
+ expect(controller.local.sink.done, completes);
+ expect(controller.foreign.sink.done, completes);
+
+ channel1.sink.close();
+ });
+
+ test(
+ "doesn't close the underlying channel when it closes with other "
+ 'virtual channels', () {
+ controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
+ controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
+
+ // Establish another virtual connection which should keep the underlying
+ // connection open.
+ channel2.virtualChannel(channel1.virtualChannel().id);
+ channel1.sink.close();
+
+ // Ensure that there's enough time for the underlying channel to complete
+ // if it's going to.
+ return pumpEventQueue();
+ });
+ });
+
+ group('a locally-created virtual channel', () {
+ late VirtualChannel virtual1;
+ late VirtualChannel virtual2;
+ setUp(() {
+ virtual1 = channel1.virtualChannel();
+ virtual2 = channel2.virtualChannel(virtual1.id);
+ });
+
+ test('sends messages only to the other virtual channel', () {
+ var first = true;
+ virtual2.stream.listen(expectAsync1((message) {
+ if (first) {
+ expect(message, equals(1));
+ first = false;
+ } else {
+ expect(message, equals(2));
+ }
+ }, count: 2));
+
+ // No other virtual channels should receive the message.
+ for (var i = 0; i < 10; i++) {
+ var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
+ virtual.stream.listen(expectAsync1((_) {}, count: 0));
+ }
+ channel2.stream.listen(expectAsync1((_) {}, count: 0));
+
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ });
+
+ test('closes the remote virtual channel when it closes', () {
+ expect(virtual2.stream.toList(), completion(isEmpty));
+ expect(virtual2.sink.done, completes);
+
+ virtual1.sink.close();
+ });
+
+ test('closes the local virtual channel when it closes', () {
+ expect(virtual1.stream.toList(), completion(isEmpty));
+ expect(virtual1.sink.done, completes);
+
+ virtual1.sink.close();
+ });
+
+ test(
+ "doesn't closes the local virtual channel when the stream "
+ 'subscription is canceled', () {
+ virtual1.sink.done.then(expectAsync1((_) {}, count: 0));
+ virtual1.stream.listen((_) {}).cancel();
+
+ // Ensure that there's enough time for the channel to close if it's going
+ // to.
+ return pumpEventQueue();
+ });
+
+ test(
+ 'closes the underlying channel when it closes without any other '
+ 'virtual channels', () async {
+ // First close the default channel so we can test the new channel as the
+ // last living virtual channel.
+ unawaited(channel1.sink.close());
+
+ await channel2.stream.toList();
+ expect(controller.local.sink.done, completes);
+ expect(controller.foreign.sink.done, completes);
+
+ unawaited(virtual1.sink.close());
+ });
+
+ test(
+ "doesn't close the underlying channel when it closes with other "
+ 'virtual channels', () {
+ controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
+ controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
+
+ virtual1.sink.close();
+
+ // Ensure that there's enough time for the underlying channel to complete
+ // if it's going to.
+ return pumpEventQueue();
+ });
+
+ test("doesn't conflict with a remote virtual channel", () {
+ var virtual3 = channel2.virtualChannel();
+ var virtual4 = channel1.virtualChannel(virtual3.id);
+
+ // This is an implementation detail, but we assert it here to make sure
+ // we're properly testing two channels with the same id.
+ expect(virtual1.id, equals(virtual3.id));
+
+ virtual2.stream
+ .listen(expectAsync1((message) => expect(message, equals(1))));
+ virtual4.stream
+ .listen(expectAsync1((message) => expect(message, equals(2))));
+
+ virtual1.sink.add(1);
+ virtual3.sink.add(2);
+ });
+ });
+
+ group('a remotely-created virtual channel', () {
+ late VirtualChannel virtual1;
+ late VirtualChannel virtual2;
+ setUp(() {
+ virtual1 = channel1.virtualChannel();
+ virtual2 = channel2.virtualChannel(virtual1.id);
+ });
+
+ test('sends messages only to the other virtual channel', () {
+ var first = true;
+ virtual1.stream.listen(expectAsync1((message) {
+ if (first) {
+ expect(message, equals(1));
+ first = false;
+ } else {
+ expect(message, equals(2));
+ }
+ }, count: 2));
+
+ // No other virtual channels should receive the message.
+ for (var i = 0; i < 10; i++) {
+ var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
+ virtual.stream.listen(expectAsync1((_) {}, count: 0));
+ }
+ channel1.stream.listen(expectAsync1((_) {}, count: 0));
+
+ virtual2.sink.add(1);
+ virtual2.sink.add(2);
+ });
+
+ test('closes the remote virtual channel when it closes', () {
+ expect(virtual1.stream.toList(), completion(isEmpty));
+ expect(virtual1.sink.done, completes);
+
+ virtual2.sink.close();
+ });
+
+ test('closes the local virtual channel when it closes', () {
+ expect(virtual2.stream.toList(), completion(isEmpty));
+ expect(virtual2.sink.done, completes);
+
+ virtual2.sink.close();
+ });
+
+ test(
+ "doesn't closes the local virtual channel when the stream "
+ 'subscription is canceled', () {
+ virtual2.sink.done.then(expectAsync1((_) {}, count: 0));
+ virtual2.stream.listen((_) {}).cancel();
+
+ // Ensure that there's enough time for the channel to close if it's going
+ // to.
+ return pumpEventQueue();
+ });
+
+ test(
+ 'closes the underlying channel when it closes without any other '
+ 'virtual channels', () async {
+ // First close the default channel so we can test the new channel as the
+ // last living virtual channel.
+ unawaited(channel2.sink.close());
+
+ await channel1.stream.toList();
+ expect(controller.local.sink.done, completes);
+ expect(controller.foreign.sink.done, completes);
+
+ unawaited(virtual2.sink.close());
+ });
+
+ test(
+ "doesn't close the underlying channel when it closes with other "
+ 'virtual channels', () {
+ controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
+ controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
+
+ virtual2.sink.close();
+
+ // Ensure that there's enough time for the underlying channel to complete
+ // if it's going to.
+ return pumpEventQueue();
+ });
+
+ test("doesn't allow another virtual channel with the same id", () {
+ expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError);
+ });
+
+ test('dispatches events received before the virtual channel is created',
+ () async {
+ virtual1 = channel1.virtualChannel();
+
+ virtual1.sink.add(1);
+ await pumpEventQueue();
+
+ virtual1.sink.add(2);
+ await pumpEventQueue();
+
+ expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2]));
+ });
+
+ test(
+ 'dispatches close events received before the virtual channel is '
+ 'created', () async {
+ virtual1 = channel1.virtualChannel();
+
+ unawaited(virtual1.sink.close());
+ await pumpEventQueue();
+
+ expect(channel2.virtualChannel(virtual1.id).stream.toList(),
+ completion(isEmpty));
+ });
+ });
+
+ group('when the underlying stream', () {
+ late VirtualChannel virtual1;
+ late VirtualChannel virtual2;
+ setUp(() {
+ virtual1 = channel1.virtualChannel();
+ virtual2 = channel2.virtualChannel(virtual1.id);
+ });
+
+ test('closes, all virtual channels close', () {
+ expect(channel1.stream.toList(), completion(isEmpty));
+ expect(channel1.sink.done, completes);
+ expect(channel2.stream.toList(), completion(isEmpty));
+ expect(channel2.sink.done, completes);
+ expect(virtual1.stream.toList(), completion(isEmpty));
+ expect(virtual1.sink.done, completes);
+ expect(virtual2.stream.toList(), completion(isEmpty));
+ expect(virtual2.sink.done, completes);
+
+ controller.local.sink.close();
+ });
+
+ test('closes, more virtual channels are created closed', () async {
+ unawaited(channel2.sink.close());
+ unawaited(virtual2.sink.close());
+
+ // Wait for the existing channels to emit done events.
+ await channel1.stream.toList();
+ await virtual1.stream.toList();
+
+ var virtual = channel1.virtualChannel();
+ expect(virtual.stream.toList(), completion(isEmpty));
+ expect(virtual.sink.done, completes);
+
+ virtual = channel1.virtualChannel();
+ expect(virtual.stream.toList(), completion(isEmpty));
+ expect(virtual.sink.done, completes);
+ });
+
+ test('emits an error, the error is sent only to the default channel', () {
+ channel1.stream.listen(expectAsync1((_) {}, count: 0),
+ onError: expectAsync1((error) => expect(error, equals('oh no'))));
+ virtual1.stream.listen(expectAsync1((_) {}, count: 0),
+ onError: expectAsync1((_) {}, count: 0));
+
+ controller.foreign.sink.addError('oh no');
+ });
+ });
+
+ group('stream channel rules', () {
+ group('for the main stream:', () {
+ test(
+ 'closing the sink causes the stream to close before it emits any '
+ 'more events', () {
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+
+ channel2.stream.listen(expectAsync1((message) {
+ expect(message, equals(1));
+ channel2.sink.close();
+ }, count: 1));
+ });
+
+ test('after the stream closes, the sink ignores events', () async {
+ unawaited(channel1.sink.close());
+
+ // Wait for the done event to be delivered.
+ await channel2.stream.toList();
+ channel2.sink.add(1);
+ channel2.sink.add(2);
+ channel2.sink.add(3);
+ unawaited(channel2.sink.close());
+
+ // None of our channel.sink additions should make it to the other
+ // endpoint.
+ channel1.stream.listen(expectAsync1((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("canceling the stream's subscription has no effect on the sink",
+ () async {
+ unawaited(channel1.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+ unawaited(channel1.sink.close());
+ expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("canceling the stream's subscription doesn't stop a done event",
+ () async {
+ unawaited(channel1.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ unawaited(channel2.sink.close());
+ await pumpEventQueue();
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+ unawaited(channel1.sink.close());
+
+ // The sink should be ignoring events because the channel closed.
+ channel2.stream.listen(expectAsync1((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+ });
+
+ group('for a virtual channel:', () {
+ late VirtualChannel virtual1;
+ late VirtualChannel virtual2;
+ setUp(() {
+ virtual1 = channel1.virtualChannel();
+ virtual2 = channel2.virtualChannel(virtual1.id);
+ });
+
+ test(
+ 'closing the sink causes the stream to close before it emits any '
+ 'more events', () {
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ virtual1.sink.add(3);
+
+ virtual2.stream.listen(expectAsync1((message) {
+ expect(message, equals(1));
+ virtual2.sink.close();
+ }, count: 1));
+ });
+
+ test('after the stream closes, the sink ignores events', () async {
+ unawaited(virtual1.sink.close());
+
+ // Wait for the done event to be delivered.
+ await virtual2.stream.toList();
+ virtual2.sink.add(1);
+ virtual2.sink.add(2);
+ virtual2.sink.add(3);
+ unawaited(virtual2.sink.close());
+
+ // None of our virtual.sink additions should make it to the other
+ // endpoint.
+ virtual1.stream.listen(expectAsync1((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("canceling the stream's subscription has no effect on the sink",
+ () async {
+ unawaited(virtual1.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ virtual1.sink.add(3);
+ unawaited(virtual1.sink.close());
+ expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("canceling the stream's subscription doesn't stop a done event",
+ () async {
+ unawaited(virtual1.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ unawaited(virtual2.sink.close());
+ await pumpEventQueue();
+
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ virtual1.sink.add(3);
+ unawaited(virtual1.sink.close());
+
+ // The sink should be ignoring events because the stream closed.
+ virtual2.stream.listen(expectAsync1((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+ });
+ });
+}
diff --git a/pkgs/stream_channel/test/stream_channel_completer_test.dart b/pkgs/stream_channel/test/stream_channel_completer_test.dart
new file mode 100644
index 0000000..c6fddc0
--- /dev/null
+++ b/pkgs/stream_channel/test/stream_channel_completer_test.dart
@@ -0,0 +1,120 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamChannelCompleter completer;
+ late StreamController streamController;
+ late StreamController sinkController;
+ late StreamChannel innerChannel;
+ setUp(() {
+ completer = StreamChannelCompleter();
+ streamController = StreamController<void>();
+ sinkController = StreamController<void>();
+ innerChannel = StreamChannel(streamController.stream, sinkController.sink);
+ });
+
+ group('when a channel is set before accessing', () {
+ test('forwards events through the stream', () {
+ completer.setChannel(innerChannel);
+ expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ streamController.close();
+ });
+
+ test('forwards events through the sink', () {
+ completer.setChannel(innerChannel);
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+
+ completer.channel.sink.add(1);
+ completer.channel.sink.add(2);
+ completer.channel.sink.add(3);
+ completer.channel.sink.close();
+ });
+
+ test('forwards an error through the stream', () {
+ completer.setError('oh no');
+ expect(completer.channel.stream.first, throwsA('oh no'));
+ });
+
+ test('drops sink events', () {
+ completer.setError('oh no');
+ expect(completer.channel.sink.done, completes);
+ completer.channel.sink.add(1);
+ completer.channel.sink.addError('oh no');
+ });
+ });
+
+ group('when a channel is set after accessing', () {
+ test('forwards events through the stream', () async {
+ expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+ await pumpEventQueue();
+
+ completer.setChannel(innerChannel);
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ unawaited(streamController.close());
+ });
+
+ test('forwards events through the sink', () async {
+ completer.channel.sink.add(1);
+ completer.channel.sink.add(2);
+ completer.channel.sink.add(3);
+ unawaited(completer.channel.sink.close());
+ await pumpEventQueue();
+
+ completer.setChannel(innerChannel);
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test('forwards an error through the stream', () async {
+ expect(completer.channel.stream.first, throwsA('oh no'));
+ await pumpEventQueue();
+
+ completer.setError('oh no');
+ });
+
+ test('drops sink events', () async {
+ expect(completer.channel.sink.done, completes);
+ completer.channel.sink.add(1);
+ completer.channel.sink.addError('oh no');
+ await pumpEventQueue();
+
+ completer.setError('oh no');
+ });
+ });
+
+ group('forFuture', () {
+ test('forwards a StreamChannel', () {
+ var channel =
+ StreamChannelCompleter.fromFuture(Future.value(innerChannel));
+ channel.sink.add(1);
+ channel.sink.close();
+ streamController.sink.add(2);
+ streamController.sink.close();
+
+ expect(sinkController.stream.toList(), completion(equals([1])));
+ expect(channel.stream.toList(), completion(equals([2])));
+ });
+
+ test('forwards an error', () {
+ var channel = StreamChannelCompleter.fromFuture(Future.error('oh no'));
+ expect(channel.stream.toList(), throwsA('oh no'));
+ });
+ });
+
+ test("doesn't allow the channel to be set multiple times", () {
+ completer.setChannel(innerChannel);
+ expect(() => completer.setChannel(innerChannel), throwsStateError);
+ expect(() => completer.setChannel(innerChannel), throwsStateError);
+ });
+}
diff --git a/pkgs/stream_channel/test/stream_channel_controller_test.dart b/pkgs/stream_channel/test/stream_channel_controller_test.dart
new file mode 100644
index 0000000..3d661e3
--- /dev/null
+++ b/pkgs/stream_channel/test/stream_channel_controller_test.dart
@@ -0,0 +1,104 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ group('asynchronously', () {
+ late StreamChannelController controller;
+ setUp(() {
+ controller = StreamChannelController();
+ });
+
+ test('forwards events from the local sink to the foreign stream', () {
+ controller.local.sink
+ ..add(1)
+ ..add(2)
+ ..add(3)
+ ..close();
+ expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test('forwards events from the foreign sink to the local stream', () {
+ controller.foreign.sink
+ ..add(1)
+ ..add(2)
+ ..add(3)
+ ..close();
+ expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test(
+ 'with allowForeignErrors: false, shuts down the connection if an '
+ 'error is added to the foreign channel', () {
+ controller = StreamChannelController(allowForeignErrors: false);
+
+ controller.foreign.sink.addError('oh no');
+ expect(controller.foreign.sink.done, throwsA('oh no'));
+ expect(controller.foreign.stream.toList(), completion(isEmpty));
+ expect(controller.local.sink.done, completes);
+ expect(controller.local.stream.toList(), completion(isEmpty));
+ });
+ });
+
+ group('synchronously', () {
+ late StreamChannelController controller;
+ setUp(() {
+ controller = StreamChannelController(sync: true);
+ });
+
+ test(
+ 'synchronously forwards events from the local sink to the foreign '
+ 'stream', () {
+ var receivedEvent = false;
+ var receivedError = false;
+ var receivedDone = false;
+ controller.foreign.stream.listen(expectAsync1((event) {
+ expect(event, equals(1));
+ receivedEvent = true;
+ }), onError: expectAsync1((error) {
+ expect(error, equals('oh no'));
+ receivedError = true;
+ }), onDone: expectAsync0(() {
+ receivedDone = true;
+ }));
+
+ controller.local.sink.add(1);
+ expect(receivedEvent, isTrue);
+
+ controller.local.sink.addError('oh no');
+ expect(receivedError, isTrue);
+
+ controller.local.sink.close();
+ expect(receivedDone, isTrue);
+ });
+
+ test(
+ 'synchronously forwards events from the foreign sink to the local '
+ 'stream', () {
+ var receivedEvent = false;
+ var receivedError = false;
+ var receivedDone = false;
+ controller.local.stream.listen(expectAsync1((event) {
+ expect(event, equals(1));
+ receivedEvent = true;
+ }), onError: expectAsync1((error) {
+ expect(error, equals('oh no'));
+ receivedError = true;
+ }), onDone: expectAsync0(() {
+ receivedDone = true;
+ }));
+
+ controller.foreign.sink.add(1);
+ expect(receivedEvent, isTrue);
+
+ controller.foreign.sink.addError('oh no');
+ expect(receivedError, isTrue);
+
+ controller.foreign.sink.close();
+ expect(receivedDone, isTrue);
+ });
+ });
+}
diff --git a/pkgs/stream_channel/test/stream_channel_test.dart b/pkgs/stream_channel/test/stream_channel_test.dart
new file mode 100644
index 0000000..c44b6ab
--- /dev/null
+++ b/pkgs/stream_channel/test/stream_channel_test.dart
@@ -0,0 +1,138 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ test("pipe() pipes data from each channel's stream into the other's sink",
+ () {
+ var otherStreamController = StreamController<int>();
+ var otherSinkController = StreamController<int>();
+ var otherChannel =
+ StreamChannel(otherStreamController.stream, otherSinkController.sink);
+
+ var streamController = StreamController<int>();
+ var sinkController = StreamController<int>();
+ var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+ channel.pipe(otherChannel);
+
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ streamController.close();
+ expect(otherSinkController.stream.toList(), completion(equals([1, 2, 3])));
+
+ otherStreamController.add(4);
+ otherStreamController.add(5);
+ otherStreamController.add(6);
+ otherStreamController.close();
+ expect(sinkController.stream.toList(), completion(equals([4, 5, 6])));
+ });
+
+ test('transform() transforms the channel', () async {
+ var streamController = StreamController<List<int>>();
+ var sinkController = StreamController<List<int>>();
+ var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+ var transformed = channel
+ .cast<List<int>>()
+ .transform(StreamChannelTransformer.fromCodec(utf8));
+
+ streamController.add([102, 111, 111, 98, 97, 114]);
+ unawaited(streamController.close());
+ expect(await transformed.stream.toList(), equals(['foobar']));
+
+ transformed.sink.add('fblthp');
+ unawaited(transformed.sink.close());
+ expect(
+ sinkController.stream.toList(),
+ completion(equals([
+ [102, 98, 108, 116, 104, 112]
+ ])));
+ });
+
+ test('transformStream() transforms only the stream', () async {
+ var streamController = StreamController<String>();
+ var sinkController = StreamController<String>();
+ var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+ var transformed =
+ channel.cast<String>().transformStream(const LineSplitter());
+
+ streamController.add('hello world');
+ streamController.add(' what\nis');
+ streamController.add('\nup');
+ unawaited(streamController.close());
+ expect(await transformed.stream.toList(),
+ equals(['hello world what', 'is', 'up']));
+
+ transformed.sink.add('fbl\nthp');
+ unawaited(transformed.sink.close());
+ expect(sinkController.stream.toList(), completion(equals(['fbl\nthp'])));
+ });
+
+ test('transformSink() transforms only the sink', () async {
+ var streamController = StreamController<String>();
+ var sinkController = StreamController<String>();
+ var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+ var transformed = channel.cast<String>().transformSink(
+ const StreamSinkTransformer.fromStreamTransformer(LineSplitter()));
+
+ streamController.add('fbl\nthp');
+ unawaited(streamController.close());
+ expect(await transformed.stream.toList(), equals(['fbl\nthp']));
+
+ transformed.sink.add('hello world');
+ transformed.sink.add(' what\nis');
+ transformed.sink.add('\nup');
+ unawaited(transformed.sink.close());
+ expect(sinkController.stream.toList(),
+ completion(equals(['hello world what', 'is', 'up'])));
+ });
+
+ test('changeStream() changes the stream', () {
+ var streamController = StreamController<int>();
+ var sinkController = StreamController<int>();
+ var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+ var newController = StreamController<int>();
+ var changed = channel.changeStream((stream) {
+ expect(stream, equals(channel.stream));
+ return newController.stream;
+ });
+
+ newController.add(10);
+ newController.close();
+
+ streamController.add(20);
+ streamController.close();
+
+ expect(changed.stream.toList(), completion(equals([10])));
+ });
+
+ test('changeSink() changes the sink', () {
+ var streamController = StreamController<int>();
+ var sinkController = StreamController<int>();
+ var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+ var newController = StreamController<int>();
+ var changed = channel.changeSink((sink) {
+ expect(sink, equals(channel.sink));
+ return newController.sink;
+ });
+
+ expect(newController.stream.toList(), completion(equals([10])));
+ streamController.stream.listen(expectAsync1((_) {}, count: 0));
+
+ changed.sink.add(10);
+ changed.sink.close();
+ });
+}
diff --git a/pkgs/stream_channel/test/with_close_guarantee_test.dart b/pkgs/stream_channel/test/with_close_guarantee_test.dart
new file mode 100644
index 0000000..9c0b729
--- /dev/null
+++ b/pkgs/stream_channel/test/with_close_guarantee_test.dart
@@ -0,0 +1,69 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+final _delayTransformer = StreamTransformer.fromHandlers(
+ handleData: (data, sink) => Future.microtask(() => sink.add(data)),
+ handleDone: (sink) => Future.microtask(() => sink.close()));
+
+final _delaySinkTransformer =
+ StreamSinkTransformer.fromStreamTransformer(_delayTransformer);
+
+void main() {
+ late StreamChannelController controller;
+ late StreamChannel channel;
+ setUp(() {
+ controller = StreamChannelController();
+
+ // Add a bunch of layers of asynchronous dispatch between the channel and
+ // the underlying controllers.
+ var stream = controller.foreign.stream;
+ var sink = controller.foreign.sink;
+ for (var i = 0; i < 10; i++) {
+ stream = stream.transform(_delayTransformer);
+ sink = _delaySinkTransformer.bind(sink);
+ }
+
+ channel = StreamChannel.withCloseGuarantee(stream, sink);
+ });
+
+ test(
+ 'closing the event sink causes the stream to close before it emits any '
+ 'more events', () async {
+ controller.local.sink.add(1);
+ controller.local.sink.add(2);
+ controller.local.sink.add(3);
+
+ expect(
+ channel.stream
+ .listen(expectAsync1((event) {
+ if (event == 2) channel.sink.close();
+ }, count: 2))
+ .asFuture<void>(),
+ completes);
+
+ await pumpEventQueue();
+ });
+
+ test(
+ 'closing the event sink before events are emitted causes the stream to '
+ 'close immediately', () async {
+ unawaited(channel.sink.close());
+ channel.stream.listen(expectAsync1((_) {}, count: 0),
+ onError: expectAsync2((_, __) {}, count: 0),
+ onDone: expectAsync0(() {}));
+
+ controller.local.sink.add(1);
+ controller.local.sink.add(2);
+ controller.local.sink.add(3);
+ unawaited(controller.local.sink.close());
+
+ await pumpEventQueue();
+ });
+}
diff --git a/pkgs/stream_channel/test/with_guarantees_test.dart b/pkgs/stream_channel/test/with_guarantees_test.dart
new file mode 100644
index 0000000..f026079
--- /dev/null
+++ b/pkgs/stream_channel/test/with_guarantees_test.dart
@@ -0,0 +1,200 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamController streamController;
+ late StreamController sinkController;
+ late StreamChannel channel;
+ setUp(() {
+ streamController = StreamController<void>();
+ sinkController = StreamController<void>();
+ channel = StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink);
+ });
+
+ group('with a broadcast stream', () {
+ setUp(() {
+ streamController = StreamController.broadcast();
+ channel = StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink);
+ });
+
+ test('buffers events', () async {
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ await pumpEventQueue();
+
+ expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+ unawaited(streamController.close());
+ });
+
+ test('only allows a single subscription', () {
+ channel.stream.listen(null);
+ expect(() => channel.stream.listen(null), throwsStateError);
+ });
+ });
+
+ test(
+ 'closing the event sink causes the stream to close before it emits any '
+ 'more events', () {
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+
+ expect(
+ channel.stream
+ .listen(expectAsync1((event) {
+ if (event == 2) channel.sink.close();
+ }, count: 2))
+ .asFuture<void>(),
+ completes);
+ });
+
+ test('after the stream closes, the sink ignores events', () async {
+ unawaited(streamController.close());
+
+ // Wait for the done event to be delivered.
+ await channel.stream.toList();
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ unawaited(channel.sink.close());
+
+ // None of our channel.sink additions should make it to the other endpoint.
+ sinkController.stream.listen(expectAsync1((_) {}, count: 0),
+ onDone: expectAsync0(() {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("canceling the stream's subscription has no effect on the sink",
+ () async {
+ unawaited(channel.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ unawaited(channel.sink.close());
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("canceling the stream's subscription doesn't stop a done event",
+ () async {
+ unawaited(channel.stream.listen(null).cancel());
+ await pumpEventQueue();
+
+ unawaited(streamController.close());
+ await pumpEventQueue();
+
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ unawaited(channel.sink.close());
+
+ // The sink should be ignoring events because the stream closed.
+ sinkController.stream.listen(expectAsync1((_) {}, count: 0),
+ onDone: expectAsync0(() {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test('forwards errors to the other endpoint', () {
+ channel.sink.addError('error');
+ expect(sinkController.stream.first, throwsA('error'));
+ });
+
+ test('Sink.done completes once the stream is done', () {
+ channel.stream.listen(null);
+ expect(channel.sink.done, completes);
+ streamController.close();
+ });
+
+ test("events can't be added to an explicitly-closed sink", () {
+ sinkController.stream.listen(null); // Work around sdk#19095.
+
+ expect(channel.sink.close(), completes);
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ expect(() => channel.sink.addStream(Stream.fromIterable([])),
+ throwsStateError);
+ });
+
+ test("events can't be added while a stream is being added", () {
+ var controller = StreamController<void>();
+ channel.sink.addStream(controller.stream);
+
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError('oh no'), throwsStateError);
+ expect(() => channel.sink.addStream(Stream.fromIterable([])),
+ throwsStateError);
+ expect(() => channel.sink.close(), throwsStateError);
+
+ controller.close();
+ });
+
+ group('with allowSinkErrors: false', () {
+ setUp(() {
+ streamController = StreamController<void>();
+ sinkController = StreamController<void>();
+ channel = StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink,
+ allowSinkErrors: false);
+ });
+
+ test('forwards errors to Sink.done but not the stream', () {
+ channel.sink.addError('oh no');
+ expect(channel.sink.done, throwsA('oh no'));
+ sinkController.stream
+ .listen(null, onError: expectAsync1((dynamic _) {}, count: 0));
+ });
+
+ test('adding an error causes the stream to emit a done event', () {
+ expect(channel.sink.done, throwsA('oh no'));
+
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+
+ expect(
+ channel.stream
+ .listen(expectAsync1((event) {
+ if (event == 2) channel.sink.addError('oh no');
+ }, count: 2))
+ .asFuture<void>(),
+ completes);
+ });
+
+ test('adding an error closes the inner sink', () {
+ channel.sink.addError('oh no');
+ expect(channel.sink.done, throwsA('oh no'));
+ expect(sinkController.stream.toList(), completion(isEmpty));
+ });
+
+ test(
+ 'adding an error via via addStream causes the stream to emit a done '
+ 'event', () async {
+ var canceled = false;
+ var controller = StreamController<void>(onCancel: () {
+ canceled = true;
+ });
+
+ // This future shouldn't get the error, because it's sent to [Sink.done].
+ expect(channel.sink.addStream(controller.stream), completes);
+
+ controller.addError('oh no');
+ expect(channel.sink.done, throwsA('oh no'));
+ await pumpEventQueue();
+ expect(canceled, isTrue);
+
+ // Even though the sink is closed, this shouldn't throw an error because
+ // the user didn't explicitly close it.
+ channel.sink.add(1);
+ });
+ });
+}