Merge pull request #1266 from dart-lang/merge-stream_channel-package

Merge `package:stream_channel`
diff --git a/.github/ISSUE_TEMPLATE/stream_channel.md b/.github/ISSUE_TEMPLATE/stream_channel.md
new file mode 100644
index 0000000..76b5994
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/stream_channel.md
@@ -0,0 +1,5 @@
+---
+name: "package:stream_channel"
+about: "Create a bug or file a feature request against package:stream_channel."
+labels: "package:stream_channel"
+---
\ No newline at end of file
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 25efb2a..64585f3 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -116,6 +116,10 @@
   - changed-files:
     - any-glob-to-any-file: 'pkgs/stack_trace/**'
 
+'package:stream_channel':
+  - changed-files:
+    - any-glob-to-any-file: 'pkgs/stream_channel/**'
+
 'package:stream_transform':
   - changed-files:
     - any-glob-to-any-file: 'pkgs/stream_transform/**'
diff --git a/.github/workflows/stream_channel.yaml b/.github/workflows/stream_channel.yaml
new file mode 100644
index 0000000..c39424d
--- /dev/null
+++ b/.github/workflows/stream_channel.yaml
@@ -0,0 +1,74 @@
+name: package:stream_channel
+
+on:
+  # Run on PRs and pushes to the default branch.
+  push:
+    branches: [ main ]
+    paths:
+      - '.github/workflows/stream_channel.yaml'
+      - 'pkgs/stream_channel/**'
+  pull_request:
+    branches: [ main ]
+    paths:
+      - '.github/workflows/stream_channel.yaml'
+      - 'pkgs/stream_channel/**'
+  schedule:
+    - cron: "0 0 * * 0"
+
+env:
+  PUB_ENVIRONMENT: bot.github
+
+defaults:
+  run:
+    working-directory: pkgs/stream_channel/
+
+jobs:
+  # Check code formatting and static analysis on a single OS (linux)
+  # against Dart dev.
+  analyze:
+    runs-on: ubuntu-latest
+    strategy:
+      fail-fast: false
+      matrix:
+        sdk: [dev]
+    steps:
+      - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+      - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+        with:
+          sdk: ${{ matrix.sdk }}
+      - id: install
+        name: Install dependencies
+        run: dart pub get
+      - name: Check formatting
+        run: dart format --output=none --set-exit-if-changed .
+        if: always() && steps.install.outcome == 'success'
+      - name: Analyze code
+        run: dart analyze --fatal-infos
+        if: always() && steps.install.outcome == 'success'
+
+  # Run tests on a matrix consisting of two dimensions:
+  # 1. OS: ubuntu-latest, (macos-latest, windows-latest)
+  # 2. release channel: dev
+  test:
+    needs: analyze
+    runs-on: ${{ matrix.os }}
+    strategy:
+      fail-fast: false
+      matrix:
+        # Add macos-latest and/or windows-latest if relevant for this package.
+        os: [ubuntu-latest]
+        sdk: [3.3, dev]
+    steps:
+      - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+      - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+        with:
+          sdk: ${{ matrix.sdk }}
+      - id: install
+        name: Install dependencies
+        run: dart pub get
+      - name: Run VM tests
+        run: dart test --platform vm
+        if: always() && steps.install.outcome == 'success'
+      - name: Run Chrome tests
+        run: dart test --platform chrome
+        if: always() && steps.install.outcome == 'success'
diff --git a/README.md b/README.md
index 563f90d..e58417b 100644
--- a/README.md
+++ b/README.md
@@ -42,6 +42,7 @@
 | [source_span](pkgs/source_span/) | Provides a standard representation for source code locations and spans. | [![package issues](https://img.shields.io/badge/package:source_span-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_span) | [![pub package](https://img.shields.io/pub/v/source_span.svg)](https://pub.dev/packages/source_span) |
 | [sse](pkgs/sse/) | Provides client and server functionality for setting up bi-directional communication through Server Sent Events (SSE) and corresponding POST requests. | [![package issues](https://img.shields.io/badge/package:sse-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asse) | [![pub package](https://img.shields.io/pub/v/sse.svg)](https://pub.dev/packages/sse) |
 | [stack_trace](pkgs/stack_trace/) | A package for manipulating stack traces and printing them readably. | [![package issues](https://img.shields.io/badge/package:stack_trace-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astack_trace) | [![pub package](https://img.shields.io/pub/v/stack_trace.svg)](https://pub.dev/packages/stack_trace) |
+| [stream_channel](pkgs/stream_channel/) | An abstraction for two-way communication channels based on the Dart Stream class. | [![package issues](https://img.shields.io/badge/package:stream_channel-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_channel) | [![pub package](https://img.shields.io/pub/v/stream_channel.svg)](https://pub.dev/packages/stream_channel) |
 | [stream_transform](pkgs/stream_transform/) | A collection of utilities to transform and manipulate streams. | [![package issues](https://img.shields.io/badge/package:stream_transform-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_transform) | [![pub package](https://img.shields.io/pub/v/stream_transform.svg)](https://pub.dev/packages/stream_transform) |
 | [term_glyph](pkgs/term_glyph/) | Useful Unicode glyphs and ASCII substitutes. | [![package issues](https://img.shields.io/badge/package:term_glyph-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aterm_glyph) | [![pub package](https://img.shields.io/pub/v/term_glyph.svg)](https://pub.dev/packages/term_glyph) |
 | [test_reflective_loader](pkgs/test_reflective_loader/) | Support for discovering tests and test suites using reflection. | [![package issues](https://img.shields.io/badge/package:test_reflective_loader-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Atest_reflective_loader) | [![pub package](https://img.shields.io/pub/v/test_reflective_loader.svg)](https://pub.dev/packages/test_reflective_loader) |
diff --git a/pkgs/stream_channel/.gitignore b/pkgs/stream_channel/.gitignore
new file mode 100644
index 0000000..1447012
--- /dev/null
+++ b/pkgs/stream_channel/.gitignore
@@ -0,0 +1,10 @@
+.buildlog
+.dart_tool/
+.DS_Store
+.idea
+.pub/
+.settings/
+build/
+packages
+.packages
+pubspec.lock
diff --git a/pkgs/stream_channel/AUTHORS b/pkgs/stream_channel/AUTHORS
new file mode 100644
index 0000000..e8063a8
--- /dev/null
+++ b/pkgs/stream_channel/AUTHORS
@@ -0,0 +1,6 @@
+# Below is a list of people and organizations that have contributed
+# to the project. Names should be added to the list like so:
+#
+#   Name/Organization <email address>
+
+Google Inc.
diff --git a/pkgs/stream_channel/CHANGELOG.md b/pkgs/stream_channel/CHANGELOG.md
new file mode 100644
index 0000000..30f7d32
--- /dev/null
+++ b/pkgs/stream_channel/CHANGELOG.md
@@ -0,0 +1,158 @@
+## 2.1.3
+
+* Require Dart 3.3
+* Move to `dart-lang/tools` monorepo.
+
+## 2.1.2
+
+* Require Dart 2.19
+* Add an example.
+* Fix a race condition in `IsolateChannel.connectReceive()` where the channel
+  could hang forever if its sink was closed before the connection was established.
+
+## 2.1.1
+
+* Require Dart 2.14
+* Populate the pubspec `repository` field.
+* Handle multichannel messages where the ID element is a `double` at runtime
+  instead of an `int`. When reading an array with `dart2wasm` numbers within the
+  array are parsed as `double`.
+
+## 2.1.0
+
+* Stable release for null safety.
+
+## 2.0.0
+
+**Breaking changes**
+
+* `IsolateChannel` requires a separate import
+  `package:stram_channel/isolate_channel.dart`.
+  `package:stream_channel/stream_channel.dart` will now not trigger any platform
+  concerns due to importing `dart:isolate`.
+* Remove `JsonDocumentTransformer` class. The `jsonDocument` top level is still
+  available.
+* Remove `StreamChannelTransformer.typed`. Use `.cast` on the transformed
+  channel instead.
+* Change `Future<dynamic>` returns to `Future<void>`.
+
+## 1.7.0
+
+* Make `IsolateChannel` available through
+  `package:stream_channel/isolate_channel.dart`. This will be the required
+  import in the next release.
+* Require `2.0.0` or newer SDK.
+* Internal style changes.
+
+## 1.6.8
+
+* Set max SDK version to `<3.0.0`, and adjust other dependencies.
+
+## 1.6.7+1
+
+* Fix Dart 2 runtime types in `IsolateChannel`.
+
+## 1.6.7
+
+* Update SDK version to 2.0.0-dev.17.0.
+* Add a type argument to `MultiChannel`.
+
+## 1.6.6
+
+* Fix a Dart 2 issue with inner stream transformation in `GuaranteeChannel`.
+
+* Fix a Dart 2 issue with `StreamChannelTransformer.fromCodec()`.
+
+## 1.6.5
+
+* Fix an issue with `JsonDocumentTransformer.bind` where it created an internal
+  stream channel which didn't get a properly inferred type for its `sink`.
+
+## 1.6.4
+
+* Fix a race condition in `MultiChannel` where messages from a remote virtual
+  channel could get dropped if the corresponding local channel wasn't registered
+  quickly enough.
+
+## 1.6.3
+
+* Use `pumpEventQueue()` from test.
+
+## 1.6.2
+
+* Declare support for `async` 2.0.0.
+
+## 1.6.1
+
+* Fix the type of `StreamChannel.transform()`. This previously inverted the
+  generic parameters, so it only really worked with transformers where both
+  generic types were identical.
+
+## 1.6.0
+
+* `Disconnector.disconnect()` now returns a future that completes when all the
+  inner `StreamSink.close()` futures have completed.
+
+## 1.5.0
+
+* Add `new StreamChannel.withCloseGuarantee()` to provide the specific guarantee
+  that closing the sink causes the stream to close before it emits any more
+  events. This is the only guarantee that isn't automatically preserved when
+  transforming a channel.
+
+* `StreamChannelTransformer`s provided by the `stream_channel` package now
+  properly provide the guarantee that closing the sink causes the stream to
+  close before it emits any more events
+
+## 1.4.0
+
+* Add `StreamChannel.cast()`, which soundly coerces the generic type of a
+  channel.
+
+* Add `StreamChannelTransformer.typed()`, which soundly coerces the generic type
+  of a transformer.
+
+## 1.3.2
+
+* Fix all strong-mode errors and warnings.
+
+## 1.3.1
+
+* Make `IsolateChannel` slightly more efficient.
+
+* Make `MultiChannel` follow the stream channel rules.
+
+## 1.3.0
+
+* Add `Disconnector`, a transformer that allows the caller to disconnect the
+  transformed channel.
+
+## 1.2.0
+
+* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
+  wrapping to ensure that it obeys the stream channel guarantees.
+
+* Add `StreamChannelController`, which can be used to create custom
+  `StreamChannel` objects.
+
+## 1.1.1
+
+* Fix the type annotation for `StreamChannel.transform()`'s parameter.
+
+## 1.1.0
+
+* Add `StreamChannel.transformStream()`, `StreamChannel.transformSink()`,
+  `StreamChannel.changeStream()`, and `StreamChannel.changeSink()` to support
+  changing only the stream or only the sink of a channel.
+
+* Be more explicit about `JsonDocumentTransformer`'s error-handling behavior.
+
+## 1.0.1
+
+* Fix `MultiChannel`'s constructor to take a `StreamChannel`. This is
+  technically a breaking change, but since 1.0.0 was only released an hour ago,
+  we're treating it as a bug fix.
+
+## 1.0.0
+
+* Initial version
diff --git a/pkgs/stream_channel/LICENSE b/pkgs/stream_channel/LICENSE
new file mode 100644
index 0000000..dbd2843
--- /dev/null
+++ b/pkgs/stream_channel/LICENSE
@@ -0,0 +1,27 @@
+Copyright 2015, the Dart project authors. 
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+      copyright notice, this list of conditions and the following
+      disclaimer in the documentation and/or other materials provided
+      with the distribution.
+    * Neither the name of Google LLC nor the names of its
+      contributors may be used to endorse or promote products derived
+      from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pkgs/stream_channel/README.md b/pkgs/stream_channel/README.md
new file mode 100644
index 0000000..3677ccf
--- /dev/null
+++ b/pkgs/stream_channel/README.md
@@ -0,0 +1,20 @@
+[![Build Status](https://github.com/dart-lang/tools/actions/workflows/stream_channel.yaml/badge.svg)](https://github.com/dart-lang/tools/actions/workflows/stream_channel.yaml)
+[![pub package](https://img.shields.io/pub/v/stream_channel.svg)](https://pub.dev/packages/stream_channel)
+[![package publisher](https://img.shields.io/pub/publisher/stream_channel.svg)](https://pub.dev/packages/stream_channel/publisher)
+
+This package exposes the `StreamChannel` interface, which represents a two-way
+communication channel. Each `StreamChannel` exposes a `Stream` for receiving
+data and a `StreamSink` for sending it. 
+
+`StreamChannel` helps abstract communication logic away from the underlying
+protocol. For example, the [`test`][test] package re-uses its test suite
+communication protocol for both WebSocket connections to browser suites and
+Isolate connections to VM tests.
+
+[test]: https://pub.dev/packages/test
+
+This package also contains utilities for dealing with `StreamChannel`s and with
+two-way communications in general. For documentation of these utilities, see
+[the API docs][api].
+
+[api]: https://pub.dev/documentation/stream_channel/latest/
diff --git a/pkgs/stream_channel/analysis_options.yaml b/pkgs/stream_channel/analysis_options.yaml
new file mode 100644
index 0000000..44cda4d
--- /dev/null
+++ b/pkgs/stream_channel/analysis_options.yaml
@@ -0,0 +1,5 @@
+include: package:dart_flutter_team_lints/analysis_options.yaml
+
+analyzer:
+  language:
+    strict-casts: true
diff --git a/pkgs/stream_channel/example/example.dart b/pkgs/stream_channel/example/example.dart
new file mode 100644
index 0000000..b41d8d9
--- /dev/null
+++ b/pkgs/stream_channel/example/example.dart
@@ -0,0 +1,110 @@
+// Copyright (c) 2023, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+import 'dart:io';
+import 'dart:isolate';
+
+import 'package:stream_channel/isolate_channel.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+Future<void> main() async {
+  // A StreamChannel<T>, is in simplest terms, a wrapper around a Stream<T> and
+  // a StreamSink<T>. For example, you can create a channel that wraps standard
+  // IO:
+  var stdioChannel = StreamChannel(stdin, stdout);
+  stdioChannel.sink.add('Hello!\n'.codeUnits);
+
+  // Like a Stream<T> can be transformed with a StreamTransformer<T>, a
+  // StreamChannel<T> can be transformed with a StreamChannelTransformer<T>.
+  // For example, we can handle standard input as strings:
+  var stringChannel = stdioChannel
+      .transform(StreamChannelTransformer.fromCodec(utf8))
+      .transformStream(const LineSplitter());
+  stringChannel.sink.add('world!\n');
+
+  // You can implement StreamChannel<T> by extending StreamChannelMixin<T>, but
+  // it's much easier to use a StreamChannelController<T>. A controller has two
+  // StreamChannel<T> members: `local` and `foreign`. The creator of a
+  // controller should work with the `local` channel, while the recipient should
+  // work with the `foreign` channel, and usually will not have direct access to
+  // the underlying controller.
+  var ctrl = StreamChannelController<String>();
+  ctrl.local.stream.listen((event) {
+    // Do something useful here...
+  });
+
+  // You can also pipe events from one channel to another.
+  ctrl
+    ..foreign.pipe(stringChannel)
+    ..local.sink.add('Piped!\n');
+  await ctrl.local.sink.close();
+
+  // The StreamChannel<T> interface provides several guarantees, which can be
+  // found here:
+  // https://pub.dev/documentation/stream_channel/latest/stream_channel/StreamChannel-class.html
+  //
+  // By calling `StreamChannel<T>.withGuarantees()`, you can create a
+  // StreamChannel<T> that provides all guarantees.
+  var dummyCtrl0 = StreamChannelController<String>();
+  var guaranteedChannel = StreamChannel.withGuarantees(
+      dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
+
+  // To close a StreamChannel, use `sink.close()`.
+  await guaranteedChannel.sink.close();
+
+  // A MultiChannel<T> multiplexes multiple virtual channels across a single
+  // underlying transport layer. For example, an application listening over
+  // standard I/O can still support multiple clients if it has a mechanism to
+  // separate events from different clients.
+  //
+  // A MultiChannel<T> splits events into numbered channels, which are
+  // instances of VirtualChannel<T>.
+  var dummyCtrl1 = StreamChannelController<String>();
+  var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
+  var channel1 = multiChannel.virtualChannel();
+  await multiChannel.sink.close();
+
+  // The client/peer should also create its own MultiChannel<T>, connected to
+  // the underlying transport, use the corresponding ID's to handle events in
+  // their respective channels. It is up to you how to communicate channel ID's
+  // across different endpoints.
+  var dummyCtrl2 = StreamChannelController<String>();
+  var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
+  var channel2 = multiChannel2.virtualChannel(channel1.id);
+  await channel2.sink.close();
+  await multiChannel2.sink.close();
+
+  // Multiple instances of a Dart application can communicate easily across
+  // `SendPort`/`ReceivePort` pairs by means of the `IsolateChannel<T>` class.
+  // Typically, one endpoint will create a `ReceivePort`, and call the
+  // `IsolateChannel.connectReceive` constructor. The other endpoint will be
+  // given the corresponding `SendPort`, and then call
+  // `IsolateChannel.connectSend`.
+  var recv = ReceivePort();
+  var recvChannel = IsolateChannel<void>.connectReceive(recv);
+  var sendChannel = IsolateChannel<void>.connectSend(recv.sendPort);
+
+  // You must manually close `IsolateChannel<T>` sinks, however.
+  await recvChannel.sink.close();
+  await sendChannel.sink.close();
+
+  // You can use the `Disconnector` transformer to cause a channel to act as
+  // though the remote end of its transport had disconnected.
+  var disconnector = Disconnector<String>();
+  var disconnectable = stringChannel.transform(disconnector);
+  disconnectable.sink.add('Still connected!');
+  await disconnector.disconnect();
+
+  // Additionally:
+  //   * The `DelegatingStreamController<T>` class can be extended to build a
+  //     basis for wrapping other `StreamChannel<T>` objects.
+  //   * The `jsonDocument` transformer converts events to/from JSON, using
+  //     the `json` codec from `dart:convert`.
+  //   * `package:json_rpc_2` directly builds on top of
+  //     `package:stream_channel`, so any compatible transport can be used to
+  //      create interactive client/server or peer-to-peer applications (i.e.
+  //      language servers, microservices, etc.
+}
diff --git a/pkgs/stream_channel/lib/isolate_channel.dart b/pkgs/stream_channel/lib/isolate_channel.dart
new file mode 100644
index 0000000..5d9f6e1
--- /dev/null
+++ b/pkgs/stream_channel/lib/isolate_channel.dart
@@ -0,0 +1,5 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/isolate_channel.dart' show IsolateChannel;
diff --git a/pkgs/stream_channel/lib/src/close_guarantee_channel.dart b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart
new file mode 100644
index 0000000..13432d1
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/close_guarantee_channel.dart
@@ -0,0 +1,91 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that specifically enforces the stream channel guarantee
+/// that closing the sink causes the stream to close before it emits any more
+/// events
+///
+/// This is exposed via [StreamChannel.withCloseGuarantee].
+class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> {
+  @override
+  Stream<T> get stream => _stream;
+  late final _CloseGuaranteeStream<T> _stream;
+
+  @override
+  StreamSink<T> get sink => _sink;
+  late final _CloseGuaranteeSink<T> _sink;
+
+  /// The subscription to the inner stream.
+  StreamSubscription<T>? _subscription;
+
+  /// Whether the sink has closed, causing the underlying channel to disconnect.
+  bool _disconnected = false;
+
+  CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
+    _sink = _CloseGuaranteeSink<T>(innerSink, this);
+    _stream = _CloseGuaranteeStream<T>(innerStream, this);
+  }
+}
+
+/// The stream for [CloseGuaranteeChannel].
+///
+/// This wraps the inner stream to save the subscription on the channel when
+/// [listen] is called.
+class _CloseGuaranteeStream<T> extends Stream<T> {
+  /// The inner stream this is delegating to.
+  final Stream<T> _inner;
+
+  /// The [CloseGuaranteeChannel] this belongs to.
+  final CloseGuaranteeChannel<T> _channel;
+
+  _CloseGuaranteeStream(this._inner, this._channel);
+
+  @override
+  StreamSubscription<T> listen(void Function(T)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+    // If the channel is already disconnected, we shouldn't dispatch anything
+    // but a done event.
+    if (_channel._disconnected) {
+      onData = null;
+      onError = null;
+    }
+
+    var subscription = _inner.listen(onData,
+        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+    if (!_channel._disconnected) {
+      _channel._subscription = subscription;
+    }
+    return subscription;
+  }
+}
+
+/// The sink for [CloseGuaranteeChannel].
+///
+/// This wraps the inner sink to cancel the stream subscription when the sink is
+/// canceled.
+class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> {
+  /// The [CloseGuaranteeChannel] this belongs to.
+  final CloseGuaranteeChannel<T> _channel;
+
+  _CloseGuaranteeSink(super.inner, this._channel);
+
+  @override
+  Future<void> close() {
+    var done = super.close();
+    _channel._disconnected = true;
+    var subscription = _channel._subscription;
+    if (subscription != null) {
+      // Don't dispatch anything but a done event.
+      subscription.onData(null);
+      subscription.onError(null);
+    }
+    return done;
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/delegating_stream_channel.dart b/pkgs/stream_channel/lib/src/delegating_stream_channel.dart
new file mode 100644
index 0000000..4484a59
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/delegating_stream_channel.dart
@@ -0,0 +1,23 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A simple delegating wrapper around [StreamChannel].
+///
+/// Subclasses can override individual methods, or use this to expose only
+/// [StreamChannel] methods.
+class DelegatingStreamChannel<T> extends StreamChannelMixin<T> {
+  /// The inner channel to which methods are forwarded.
+  final StreamChannel<T> _inner;
+
+  @override
+  Stream<T> get stream => _inner.stream;
+  @override
+  StreamSink<T> get sink => _inner.sink;
+
+  DelegatingStreamChannel(this._inner);
+}
diff --git a/pkgs/stream_channel/lib/src/disconnector.dart b/pkgs/stream_channel/lib/src/disconnector.dart
new file mode 100644
index 0000000..3414e9c
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/disconnector.dart
@@ -0,0 +1,153 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// Allows the caller to force a channel to disconnect.
+///
+/// When [disconnect] is called, the channel (or channels) transformed by this
+/// transformer will act as though the remote end had disconnected—the stream
+/// will emit a done event, and the sink will ignore future inputs. The inner
+/// sink will also be closed to notify the remote end of the disconnection.
+///
+/// If a channel is transformed after the [disconnect] has been called, it will
+/// be disconnected immediately.
+class Disconnector<T> implements StreamChannelTransformer<T, T> {
+  /// Whether [disconnect] has been called.
+  bool get isDisconnected => _disconnectMemo.hasRun;
+
+  /// The sinks for transformed channels.
+  ///
+  /// Note that we assume that transformed channels provide the stream channel
+  /// guarantees. This allows us to only track sinks, because we know closing
+  /// the underlying sink will cause the stream to emit a done event.
+  final _sinks = <_DisconnectorSink<T>>[];
+
+  /// Disconnects all channels that have been transformed.
+  ///
+  /// Returns a future that completes when all inner sinks' [StreamSink.close]
+  /// futures have completed. Note that a [StreamController]'s sink won't close
+  /// until the corresponding stream has a listener.
+  Future<void> disconnect() => _disconnectMemo.runOnce(() {
+        var futures = _sinks.map((sink) => sink._disconnect()).toList();
+        _sinks.clear();
+        return Future.wait(futures, eagerError: true);
+      });
+  final _disconnectMemo = AsyncMemoizer<List<void>>();
+
+  @override
+  StreamChannel<T> bind(StreamChannel<T> channel) {
+    return channel.changeSink((innerSink) {
+      var sink = _DisconnectorSink<T>(innerSink);
+
+      if (isDisconnected) {
+        // Ignore errors here, because otherwise there would be no way for the
+        // user to handle them gracefully.
+        sink._disconnect().catchError((_) {});
+      } else {
+        _sinks.add(sink);
+      }
+
+      return sink;
+    });
+  }
+}
+
+/// A sink wrapper that can force a disconnection.
+class _DisconnectorSink<T> implements StreamSink<T> {
+  /// The inner sink.
+  final StreamSink<T> _inner;
+
+  @override
+  Future<void> get done => _inner.done;
+
+  /// Whether [Disconnector.disconnect] has been called.
+  var _isDisconnected = false;
+
+  /// Whether the user has called [close].
+  var _closed = false;
+
+  /// The subscription to the stream passed to [addStream], if a stream is
+  /// currently being added.
+  StreamSubscription<T>? _addStreamSubscription;
+
+  /// The completer for the future returned by [addStream], if a stream is
+  /// currently being added.
+  Completer? _addStreamCompleter;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool get _inAddStream => _addStreamSubscription != null;
+
+  _DisconnectorSink(this._inner);
+
+  @override
+  void add(T data) {
+    if (_closed) throw StateError('Cannot add event after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add event while adding stream.');
+    }
+    if (_isDisconnected) return;
+
+    _inner.add(data);
+  }
+
+  @override
+  void addError(Object error, [StackTrace? stackTrace]) {
+    if (_closed) throw StateError('Cannot add event after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add event while adding stream.');
+    }
+    if (_isDisconnected) return;
+
+    _inner.addError(error, stackTrace);
+  }
+
+  @override
+  Future<void> addStream(Stream<T> stream) {
+    if (_closed) throw StateError('Cannot add stream after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add stream while adding stream.');
+    }
+    if (_isDisconnected) return Future.value();
+
+    _addStreamCompleter = Completer.sync();
+    _addStreamSubscription = stream.listen(_inner.add,
+        onError: _inner.addError, onDone: _addStreamCompleter!.complete);
+    return _addStreamCompleter!.future.then((_) {
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    });
+  }
+
+  @override
+  Future<void> close() {
+    if (_inAddStream) {
+      throw StateError('Cannot close sink while adding stream.');
+    }
+
+    _closed = true;
+    return _inner.close();
+  }
+
+  /// Disconnects this sink.
+  ///
+  /// This closes the underlying sink and stops forwarding events. It returns
+  /// the [StreamSink.close] future for the underlying sink.
+  Future<void> _disconnect() {
+    _isDisconnected = true;
+    var future = _inner.close();
+
+    if (_inAddStream) {
+      _addStreamCompleter!.complete(_addStreamSubscription!.cancel());
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    }
+
+    return future;
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/guarantee_channel.dart b/pkgs/stream_channel/lib/src/guarantee_channel.dart
new file mode 100644
index 0000000..30ebe2e
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/guarantee_channel.dart
@@ -0,0 +1,207 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that enforces the stream channel guarantees.
+///
+/// This is exposed via [StreamChannel.withGuarantees].
+class GuaranteeChannel<T> extends StreamChannelMixin<T> {
+  @override
+  Stream<T> get stream => _streamController.stream;
+
+  @override
+  StreamSink<T> get sink => _sink;
+  late final _GuaranteeSink<T> _sink;
+
+  /// The controller for [stream].
+  ///
+  /// This intermediate controller allows us to continue listening for a done
+  /// event even after the user has canceled their subscription, and to send our
+  /// own done event when the sink is closed.
+  late final StreamController<T> _streamController;
+
+  /// The subscription to the inner stream.
+  StreamSubscription<T>? _subscription;
+
+  /// Whether the sink has closed, causing the underlying channel to disconnect.
+  bool _disconnected = false;
+
+  GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
+      {bool allowSinkErrors = true}) {
+    _sink = _GuaranteeSink<T>(innerSink, this, allowErrors: allowSinkErrors);
+
+    // Enforce the single-subscription guarantee by changing a broadcast stream
+    // to single-subscription.
+    if (innerStream.isBroadcast) {
+      innerStream =
+          innerStream.transform(SingleSubscriptionTransformer<T, T>());
+    }
+
+    _streamController = StreamController<T>(
+        onListen: () {
+          // If the sink has disconnected, we've already called
+          // [_streamController.close].
+          if (_disconnected) return;
+
+          _subscription = innerStream.listen(_streamController.add,
+              onError: _streamController.addError, onDone: () {
+            _sink._onStreamDisconnected();
+            _streamController.close();
+          });
+        },
+        sync: true);
+  }
+
+  /// Called by [_GuaranteeSink] when the user closes it.
+  ///
+  /// The sink closing indicates that the connection is closed, so the stream
+  /// should stop emitting events.
+  void _onSinkDisconnected() {
+    _disconnected = true;
+    var subscription = _subscription;
+    if (subscription != null) subscription.cancel();
+    _streamController.close();
+  }
+}
+
+/// The sink for [GuaranteeChannel].
+///
+/// This wraps the inner sink to ignore events and cancel any in-progress
+/// [addStream] calls when the underlying channel closes.
+class _GuaranteeSink<T> implements StreamSink<T> {
+  /// The inner sink being wrapped.
+  final StreamSink<T> _inner;
+
+  /// The [GuaranteeChannel] this belongs to.
+  final GuaranteeChannel<T> _channel;
+
+  @override
+  Future<void> get done => _doneCompleter.future;
+  final _doneCompleter = Completer<void>();
+
+  /// Whether connection is disconnected.
+  ///
+  /// This can happen because the stream has emitted a done event, or because
+  /// the user added an error when [_allowErrors] is `false`.
+  bool _disconnected = false;
+
+  /// Whether the user has called [close].
+  bool _closed = false;
+
+  /// The subscription to the stream passed to [addStream], if a stream is
+  /// currently being added.
+  StreamSubscription<T>? _addStreamSubscription;
+
+  /// The completer for the future returned by [addStream], if a stream is
+  /// currently being added.
+  Completer? _addStreamCompleter;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool get _inAddStream => _addStreamSubscription != null;
+
+  /// Whether errors are passed on to the underlying sink.
+  ///
+  /// If this is `false`, any error passed to the sink is piped to [done] and
+  /// the underlying sink is closed.
+  final bool _allowErrors;
+
+  _GuaranteeSink(this._inner, this._channel, {bool allowErrors = true})
+      : _allowErrors = allowErrors;
+
+  @override
+  void add(T data) {
+    if (_closed) throw StateError('Cannot add event after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add event while adding stream.');
+    }
+    if (_disconnected) return;
+
+    _inner.add(data);
+  }
+
+  @override
+  void addError(Object error, [StackTrace? stackTrace]) {
+    if (_closed) throw StateError('Cannot add event after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add event while adding stream.');
+    }
+    if (_disconnected) return;
+
+    _addError(error, stackTrace);
+  }
+
+  /// Like [addError], but doesn't check to ensure that an error can be added.
+  ///
+  /// This is called from [addStream], so it shouldn't fail if a stream is being
+  /// added.
+  void _addError(Object error, [StackTrace? stackTrace]) {
+    if (_allowErrors) {
+      _inner.addError(error, stackTrace);
+      return;
+    }
+
+    _doneCompleter.completeError(error, stackTrace);
+
+    // Treat an error like both the stream and sink disconnecting.
+    _onStreamDisconnected();
+    _channel._onSinkDisconnected();
+
+    // Ignore errors from the inner sink. We're already surfacing one error, and
+    // if the user handles it we don't want them to have another top-level.
+    _inner.close().catchError((_) {});
+  }
+
+  @override
+  Future<void> addStream(Stream<T> stream) {
+    if (_closed) throw StateError('Cannot add stream after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add stream while adding stream.');
+    }
+    if (_disconnected) return Future.value();
+
+    _addStreamCompleter = Completer.sync();
+    _addStreamSubscription = stream.listen(_inner.add,
+        onError: _addError, onDone: _addStreamCompleter!.complete);
+    return _addStreamCompleter!.future.then((_) {
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    });
+  }
+
+  @override
+  Future<void> close() {
+    if (_inAddStream) {
+      throw StateError('Cannot close sink while adding stream.');
+    }
+
+    if (_closed) return done;
+    _closed = true;
+
+    if (!_disconnected) {
+      _channel._onSinkDisconnected();
+      _doneCompleter.complete(_inner.close());
+    }
+
+    return done;
+  }
+
+  /// Called by [GuaranteeChannel] when the stream emits a done event.
+  ///
+  /// The stream being done indicates that the connection is closed, so the
+  /// sink should stop forwarding events.
+  void _onStreamDisconnected() {
+    _disconnected = true;
+    if (!_doneCompleter.isCompleted) _doneCompleter.complete();
+
+    if (!_inAddStream) return;
+    _addStreamCompleter!.complete(_addStreamSubscription!.cancel());
+    _addStreamCompleter = null;
+    _addStreamSubscription = null;
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/isolate_channel.dart b/pkgs/stream_channel/lib/src/isolate_channel.dart
new file mode 100644
index 0000000..15c68a4
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/isolate_channel.dart
@@ -0,0 +1,115 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
+/// presumably with another isolate.
+///
+/// The remote endpoint doesn't necessarily need to be running an
+/// [IsolateChannel]. This can be used with any two ports, although the
+/// [StreamChannel] semantics mean that this class will treat them as being
+/// paired (for example, closing the [sink] will cause the [stream] to stop
+/// emitting events).
+///
+/// The underlying isolate ports have no notion of closing connections. This
+/// means that [stream] won't close unless [sink] is closed, and that closing
+/// [sink] won't cause the remote endpoint to close. Users should take care to
+/// ensure that they always close the [sink] of every [IsolateChannel] they use
+/// to avoid leaving dangling [ReceivePort]s.
+class IsolateChannel<T> extends StreamChannelMixin<T> {
+  @override
+  final Stream<T> stream;
+  @override
+  final StreamSink<T> sink;
+
+  /// Connects to a remote channel that was created with
+  /// [IsolateChannel.connectSend].
+  ///
+  /// These constructors establish a connection using only a single
+  /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+  /// connect constructors.
+  ///
+  /// The connection protocol is guaranteed to remain compatible across versions
+  /// at least until the next major version release. If the protocol is
+  /// violated, the resulting channel will emit a single value on its stream and
+  /// then close.
+  factory IsolateChannel.connectReceive(ReceivePort receivePort) {
+    // We can't use a [StreamChannelCompleter] here because we need the return
+    // value to be an [IsolateChannel].
+    var isCompleted = false;
+    var streamCompleter = StreamCompleter<T>();
+    var sinkCompleter = StreamSinkCompleter<T>();
+
+    var channel = IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink
+        .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) {
+      if (!isCompleted) {
+        receivePort.close();
+        streamCompleter.setSourceStream(const Stream.empty());
+        sinkCompleter.setDestinationSink(NullStreamSink<T>());
+      }
+      sink.close();
+    })));
+
+    // The first message across the ReceivePort should be a SendPort pointing to
+    // the remote end. If it's not, we'll make the stream emit an error
+    // complaining.
+    late StreamSubscription<dynamic> subscription;
+    subscription = receivePort.listen((message) {
+      isCompleted = true;
+      if (message is SendPort) {
+        var controller =
+            StreamChannelController<T>(allowForeignErrors: false, sync: true);
+        SubscriptionStream(subscription).cast<T>().pipe(controller.local.sink);
+        controller.local.stream
+            .listen((data) => message.send(data), onDone: receivePort.close);
+
+        streamCompleter.setSourceStream(controller.foreign.stream);
+        sinkCompleter.setDestinationSink(controller.foreign.sink);
+        return;
+      }
+
+      streamCompleter.setError(
+          StateError('Unexpected Isolate response "$message".'),
+          StackTrace.current);
+      sinkCompleter.setDestinationSink(NullStreamSink<T>());
+      subscription.cancel();
+    });
+
+    return channel;
+  }
+
+  /// Connects to a remote channel that was created with
+  /// [IsolateChannel.connectReceive].
+  ///
+  /// These constructors establish a connection using only a single
+  /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+  /// connect constructors.
+  ///
+  /// The connection protocol is guaranteed to remain compatible across versions
+  /// at least until the next major version release.
+  factory IsolateChannel.connectSend(SendPort sendPort) {
+    var receivePort = ReceivePort();
+    sendPort.send(receivePort.sendPort);
+    return IsolateChannel(receivePort, sendPort);
+  }
+
+  /// Creates a stream channel that receives messages from [receivePort] and
+  /// sends them over [sendPort].
+  factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
+    var controller =
+        StreamChannelController<T>(allowForeignErrors: false, sync: true);
+    receivePort.cast<T>().pipe(controller.local.sink);
+    controller.local.stream
+        .listen((data) => sendPort.send(data), onDone: receivePort.close);
+    return IsolateChannel._(controller.foreign.stream, controller.foreign.sink);
+  }
+
+  IsolateChannel._(this.stream, this.sink);
+}
diff --git a/pkgs/stream_channel/lib/src/json_document_transformer.dart b/pkgs/stream_channel/lib/src/json_document_transformer.dart
new file mode 100644
index 0000000..3feda43
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/json_document_transformer.dart
@@ -0,0 +1,35 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:convert';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannelTransformer] that transforms JSON documents—strings that
+/// contain individual objects encoded as JSON—into decoded Dart objects.
+///
+/// This decodes JSON that's emitted by the transformed channel's stream, and
+/// encodes objects so that JSON is passed to the transformed channel's sink.
+///
+/// If the transformed channel emits invalid JSON, this emits a
+/// [FormatException]. If an unencodable object is added to the sink, it
+/// synchronously throws a [JsonUnsupportedObjectError].
+final StreamChannelTransformer<Object?, String> jsonDocument =
+    const _JsonDocument();
+
+class _JsonDocument implements StreamChannelTransformer<Object?, String> {
+  const _JsonDocument();
+
+  @override
+  StreamChannel<Object?> bind(StreamChannel<String> channel) {
+    var stream = channel.stream.map(jsonDecode);
+    var sink = StreamSinkTransformer<Object, String>.fromHandlers(
+        handleData: (data, sink) {
+      sink.add(jsonEncode(data));
+    }).bind(channel.sink);
+    return StreamChannel.withCloseGuarantee(stream, sink);
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/multi_channel.dart b/pkgs/stream_channel/lib/src/multi_channel.dart
new file mode 100644
index 0000000..4894239
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/multi_channel.dart
@@ -0,0 +1,274 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A class that multiplexes multiple virtual channels across a single
+/// underlying transport layer.
+///
+/// This should be connected to another [MultiChannel] on the other end of the
+/// underlying channel. It starts with a single default virtual channel,
+/// accessible via [stream] and [sink]. Additional virtual channels can be
+/// created with [virtualChannel].
+///
+/// When a virtual channel is created by one endpoint, the other must connect to
+/// it before messages may be sent through it. The first endpoint passes its
+/// [VirtualChannel.id] to the second, which then creates a channel from that id
+/// also using [virtualChannel]. For example:
+///
+/// ```dart
+/// // First endpoint
+/// var virtual = multiChannel.virtualChannel();
+/// multiChannel.sink.add({
+///   "channel": virtual.id
+/// });
+///
+/// // Second endpoint
+/// multiChannel.stream.listen((message) {
+///   var virtual = multiChannel.virtualChannel(message["channel"]);
+///   // ...
+/// });
+/// ```
+///
+/// Sending errors across a [MultiChannel] is not supported. Any errors from the
+/// underlying stream will be reported only via the default
+/// [MultiChannel.stream].
+///
+/// Each virtual channel may be closed individually. When all of them are
+/// closed, the underlying [StreamSink] is closed automatically.
+abstract class MultiChannel<T> implements StreamChannel<T> {
+  /// The default input stream.
+  ///
+  /// This connects to the remote [sink].
+  @override
+  Stream<T> get stream;
+
+  /// The default output stream.
+  ///
+  /// This connects to the remote [stream]. If this is closed, the remote
+  /// [stream] will close, but other virtual channels will remain open and new
+  /// virtual channels may be opened.
+  @override
+  StreamSink<T> get sink;
+
+  /// Creates a new [MultiChannel] that sends and receives messages over
+  /// [inner].
+  ///
+  /// The inner channel must take JSON-like objects.
+  factory MultiChannel(StreamChannel<dynamic> inner) => _MultiChannel<T>(inner);
+
+  /// Creates a new virtual channel.
+  ///
+  /// If [id] is not passed, this creates a virtual channel from scratch. Before
+  /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
+  /// where [virtualChannel] should be called with that id.
+  ///
+  /// If [id] is passed, this creates a virtual channel corresponding to the
+  /// channel with that id on the remote channel.
+  ///
+  /// Throws an [ArgumentError] if a virtual channel already exists for [id].
+  /// Throws a [StateError] if the underlying channel is closed.
+  VirtualChannel<T> virtualChannel([int? id]);
+}
+
+/// The implementation of [MultiChannel].
+///
+/// This is private so that [VirtualChannel] can inherit from [MultiChannel]
+/// without having to implement all the private members.
+class _MultiChannel<T> extends StreamChannelMixin<T>
+    implements MultiChannel<T> {
+  /// The inner channel over which all communication is conducted.
+  ///
+  /// This will be `null` if the underlying communication channel is closed.
+  StreamChannel<dynamic>? _inner;
+
+  /// The subscription to [_inner].stream.
+  StreamSubscription<dynamic>? _innerStreamSubscription;
+
+  @override
+  Stream<T> get stream => _mainController.foreign.stream;
+  @override
+  StreamSink<T> get sink => _mainController.foreign.sink;
+
+  /// The controller for this channel.
+  final _mainController = StreamChannelController<T>(sync: true);
+
+  /// A map from input IDs to [StreamChannelController]s that should be used to
+  /// communicate over those channels.
+  final _controllers = <int, StreamChannelController<T>>{};
+
+  /// Input IDs of controllers in [_controllers] that we've received messages
+  /// for but that have not yet had a local [virtualChannel] created.
+  final _pendingIds = <int>{};
+
+  /// Input IDs of virtual channels that used to exist but have since been
+  /// closed.
+  final _closedIds = <int>{};
+
+  /// The next id to use for a local virtual channel.
+  ///
+  /// Ids are used to identify virtual channels. Each message is tagged with an
+  /// id; the receiving [MultiChannel] uses this id to look up which
+  /// [VirtualChannel] the message should be dispatched to.
+  ///
+  /// The id scheme for virtual channels is somewhat complicated. This is
+  /// necessary to ensure that there are no conflicts even when both endpoints
+  /// have virtual channels with the same id; since both endpoints can send and
+  /// receive messages across each virtual channel, a naïve scheme would make it
+  /// impossible to tell whether a message was from a channel that originated in
+  /// the remote endpoint or a reply on a channel that originated in the local
+  /// endpoint.
+  ///
+  /// The trick is that each endpoint only uses odd ids for its own channels.
+  /// When sending a message over a channel that was created by the remote
+  /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
+  /// knows that if an incoming message has an odd id, it's coming from a
+  /// channel that was originally created remotely, but if it has an even id,
+  /// it's coming from a channel that was originally created locally.
+  var _nextId = 1;
+
+  _MultiChannel(StreamChannel<dynamic> inner) : _inner = inner {
+    // The default connection is a special case which has id 0 on both ends.
+    // This allows it to begin connected without having to send over an id.
+    _controllers[0] = _mainController;
+    _mainController.local.stream.listen(
+        (message) => _inner!.sink.add(<Object?>[0, message]),
+        onDone: () => _closeChannel(0, 0));
+
+    _innerStreamSubscription = _inner!.stream.cast<List>().listen((message) {
+      var id = (message[0] as num).toInt();
+
+      // If the channel was closed before an incoming message was processed,
+      // ignore that message.
+      if (_closedIds.contains(id)) return;
+
+      var controller = _controllers.putIfAbsent(id, () {
+        // If we receive a message for a controller that doesn't have a local
+        // counterpart yet, create a controller for it to buffer incoming
+        // messages for when a local connection is created.
+        _pendingIds.add(id);
+        return StreamChannelController(sync: true);
+      });
+
+      if (message.length > 1) {
+        controller.local.sink.add(message[1] as T);
+      } else {
+        // A message without data indicates that the channel has been closed. We
+        // can just close the sink here without doing any more cleanup, because
+        // the sink closing will cause the stream to emit a done event which
+        // will trigger more cleanup.
+        controller.local.sink.close();
+      }
+    },
+        onDone: _closeInnerChannel,
+        onError: _mainController.local.sink.addError);
+  }
+
+  @override
+  VirtualChannel<T> virtualChannel([int? id]) {
+    int inputId;
+    int outputId;
+    if (id != null) {
+      // Since the user is passing in an id, we're connected to a remote
+      // VirtualChannel. This means messages they send over this channel will
+      // have the original odd id, but our replies will have an even id.
+      inputId = id;
+      outputId = id + 1;
+    } else {
+      // Since we're generating an id, we originated this VirtualChannel. This
+      // means messages we send over this channel will have the original odd id,
+      // but the remote channel's replies will have an even id.
+      inputId = _nextId + 1;
+      outputId = _nextId;
+      _nextId += 2;
+    }
+
+    // If the inner channel has already closed, create new virtual channels in a
+    // closed state.
+    if (_inner == null) {
+      return VirtualChannel._(
+          this, inputId, const Stream.empty(), NullStreamSink());
+    }
+
+    late StreamChannelController<T> controller;
+    if (_pendingIds.remove(inputId)) {
+      // If we've already received messages for this channel, use the controller
+      // where those messages are buffered.
+      controller = _controllers[inputId]!;
+    } else if (_controllers.containsKey(inputId) ||
+        _closedIds.contains(inputId)) {
+      throw ArgumentError('A virtual channel with id $id already exists.');
+    } else {
+      controller = StreamChannelController(sync: true);
+      _controllers[inputId] = controller;
+    }
+
+    controller.local.stream.listen(
+        (message) => _inner!.sink.add(<Object?>[outputId, message]),
+        onDone: () => _closeChannel(inputId, outputId));
+    return VirtualChannel._(
+        this, outputId, controller.foreign.stream, controller.foreign.sink);
+  }
+
+  /// Closes the virtual channel for which incoming messages have [inputId] and
+  /// outgoing messages have [outputId].
+  void _closeChannel(int inputId, int outputId) {
+    _closedIds.add(inputId);
+    var controller = _controllers.remove(inputId)!;
+    controller.local.sink.close();
+
+    if (_inner == null) return;
+
+    // A message without data indicates that the virtual channel has been
+    // closed.
+    _inner!.sink.add([outputId]);
+    if (_controllers.isEmpty) _closeInnerChannel();
+  }
+
+  /// Closes the underlying communication channel.
+  void _closeInnerChannel() {
+    _inner!.sink.close();
+    _innerStreamSubscription!.cancel();
+    _inner = null;
+
+    // Convert this to a list because the close is dispatched synchronously, and
+    // that could conceivably remove a controller from [_controllers].
+    for (var controller in _controllers.values.toList(growable: false)) {
+      controller.local.sink.close();
+    }
+    _controllers.clear();
+  }
+}
+
+/// A virtual channel created by [MultiChannel].
+///
+/// This implements [MultiChannel] for convenience.
+/// [VirtualChannel.virtualChannel] is semantically identical to the parent's
+/// [MultiChannel.virtualChannel].
+class VirtualChannel<T> extends StreamChannelMixin<T>
+    implements MultiChannel<T> {
+  /// The [MultiChannel] that created this.
+  final MultiChannel<T> _parent;
+
+  /// The identifier for this channel.
+  ///
+  /// This can be sent across the [MultiChannel] to provide the remote endpoint
+  /// a means to connect to this channel. Nothing about this is guaranteed
+  /// except that it will be JSON-serializable.
+  final int id;
+
+  @override
+  final Stream<T> stream;
+  @override
+  final StreamSink<T> sink;
+
+  VirtualChannel._(this._parent, this.id, this.stream, this.sink);
+
+  @override
+  VirtualChannel<T> virtualChannel([int? id]) => _parent.virtualChannel(id);
+}
diff --git a/pkgs/stream_channel/lib/src/stream_channel_completer.dart b/pkgs/stream_channel/lib/src/stream_channel_completer.dart
new file mode 100644
index 0000000..9d007eb
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/stream_channel_completer.dart
@@ -0,0 +1,74 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [channel] where the source and destination are provided later.
+///
+/// The [channel] is a normal channel that can be listened to and that events
+/// can be added to immediately, but until [setChannel] is called it won't emit
+/// any events and all events added to it will be buffered.
+class StreamChannelCompleter<T> {
+  /// The completer for this channel's stream.
+  final _streamCompleter = StreamCompleter<T>();
+
+  /// The completer for this channel's sink.
+  final _sinkCompleter = StreamSinkCompleter<T>();
+
+  /// The channel for this completer.
+  StreamChannel<T> get channel => _channel;
+  late final StreamChannel<T> _channel;
+
+  /// Whether [setChannel] has been called.
+  bool _set = false;
+
+  /// Convert a `Future<StreamChannel>` to a `StreamChannel`.
+  ///
+  /// This creates a channel using a channel completer, and sets the source
+  /// channel to the result of the future when the future completes.
+  ///
+  /// If the future completes with an error, the returned channel's stream will
+  /// instead contain just that error. The sink will silently discard all
+  /// events.
+  static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
+    var completer = StreamChannelCompleter<void>();
+    channelFuture.then(completer.setChannel, onError: completer.setError);
+    return completer.channel;
+  }
+
+  StreamChannelCompleter() {
+    _channel = StreamChannel<T>(_streamCompleter.stream, _sinkCompleter.sink);
+  }
+
+  /// Set a channel as the source and destination for [channel].
+  ///
+  /// A channel may be set at most once.
+  ///
+  /// Either [setChannel] or [setError] may be called at most once. Trying to
+  /// call either of them again will fail.
+  void setChannel(StreamChannel<T> channel) {
+    if (_set) throw StateError('The channel has already been set.');
+    _set = true;
+
+    _streamCompleter.setSourceStream(channel.stream);
+    _sinkCompleter.setDestinationSink(channel.sink);
+  }
+
+  /// Indicates that there was an error connecting the channel.
+  ///
+  /// This makes the stream emit [error] and close. It makes the sink discard
+  /// all its events.
+  ///
+  /// Either [setChannel] or [setError] may be called at most once. Trying to
+  /// call either of them again will fail.
+  void setError(Object error, [StackTrace? stackTrace]) {
+    if (_set) throw StateError('The channel has already been set.');
+    _set = true;
+
+    _streamCompleter.setError(error, stackTrace);
+    _sinkCompleter.setDestinationSink(NullStreamSink());
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/stream_channel_controller.dart b/pkgs/stream_channel/lib/src/stream_channel_controller.dart
new file mode 100644
index 0000000..25d5239
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/stream_channel_controller.dart
@@ -0,0 +1,67 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// @docImport 'isolate_channel.dart';
+library;
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A controller for exposing a new [StreamChannel].
+///
+/// This exposes two connected [StreamChannel]s, [local] and [foreign]. The
+/// user's code should use [local] to emit and receive events. Then [foreign]
+/// can be returned for others to use. For example, here's a simplified version
+/// of the implementation of [IsolateChannel.new]:
+///
+/// ```dart
+/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
+///   var controller = new StreamChannelController(allowForeignErrors: false);
+///
+///   // Pipe all events from the receive port into the local sink...
+///   receivePort.pipe(controller.local.sink);
+///
+///   // ...and all events from the local stream into the send port.
+///   controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
+///
+///   // Then return the foreign controller for your users to use.
+///   return controller.foreign;
+/// }
+/// ```
+class StreamChannelController<T> {
+  /// The local channel.
+  ///
+  /// This channel should be used directly by the creator of this
+  /// [StreamChannelController] to send and receive events.
+  StreamChannel<T> get local => _local;
+  late final StreamChannel<T> _local;
+
+  /// The foreign channel.
+  ///
+  /// This channel should be returned to external users so they can communicate
+  /// with [local].
+  StreamChannel<T> get foreign => _foreign;
+  late final StreamChannel<T> _foreign;
+
+  /// Creates a [StreamChannelController].
+  ///
+  /// If [sync] is true, events added to either channel's sink are synchronously
+  /// dispatched to the other channel's stream. This should only be done if the
+  /// source of those events is already asynchronous.
+  ///
+  /// If [allowForeignErrors] is `false`, errors are not allowed to be passed to
+  /// the foreign channel's sink. If any are, the connection will close and the
+  /// error will be forwarded to the foreign channel's [StreamSink.done] future.
+  /// This guarantees that the local stream will never emit errors.
+  StreamChannelController({bool allowForeignErrors = true, bool sync = false}) {
+    var localToForeignController = StreamController<T>(sync: sync);
+    var foreignToLocalController = StreamController<T>(sync: sync);
+    _local = StreamChannel<T>.withGuarantees(
+        foreignToLocalController.stream, localToForeignController.sink);
+    _foreign = StreamChannel<T>.withGuarantees(
+        localToForeignController.stream, foreignToLocalController.sink,
+        allowSinkErrors: allowForeignErrors);
+  }
+}
diff --git a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
new file mode 100644
index 0000000..cf62c76
--- /dev/null
+++ b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannelTransformer] transforms the events being passed to and
+/// emitted by a [StreamChannel].
+///
+/// This works on the same principle as [StreamTransformer] and
+/// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes
+/// in the original [StreamChannel] and returns the transformed version.
+///
+/// Transformers must be able to have [bind] called multiple times. If a
+/// subclass implements [bind] explicitly, it should be sure that the returned
+/// stream follows the second stream channel guarantee: closing the sink causes
+/// the stream to close before it emits any more events. This guarantee is
+/// invalidated when an asynchronous gap is added between the original stream's
+/// event dispatch and the returned stream's, for example by transforming it
+/// with a [StreamTransformer]. The guarantee can be easily preserved using
+/// [StreamChannel.withCloseGuarantee].
+class StreamChannelTransformer<S, T> {
+  /// The transformer to use on the channel's stream.
+  final StreamTransformer<T, S> _streamTransformer;
+
+  /// The transformer to use on the channel's sink.
+  final StreamSinkTransformer<S, T> _sinkTransformer;
+
+  /// Creates a [StreamChannelTransformer] from existing stream and sink
+  /// transformers.
+  const StreamChannelTransformer(
+      this._streamTransformer, this._sinkTransformer);
+
+  /// Creates a [StreamChannelTransformer] from a codec's encoder and decoder.
+  ///
+  /// All input to the inner channel's sink is encoded using [Codec.encoder],
+  /// and all output from its stream is decoded using [Codec.decoder].
+  StreamChannelTransformer.fromCodec(Codec<S, T> codec)
+      : this(codec.decoder,
+            StreamSinkTransformer.fromStreamTransformer(codec.encoder));
+
+  /// Transforms the events sent to and emitted by [channel].
+  ///
+  /// Creates a new channel. When events are passed to the returned channel's
+  /// sink, the transformer will transform them and pass the transformed
+  /// versions to `channel.sink`. When events are emitted from the
+  /// `channel.straem`, the transformer will transform them and pass the
+  /// transformed versions to the returned channel's stream.
+  StreamChannel<S> bind(StreamChannel<T> channel) =>
+      StreamChannel<S>.withCloseGuarantee(
+          channel.stream.transform(_streamTransformer),
+          _sinkTransformer.bind(channel.sink));
+}
diff --git a/pkgs/stream_channel/lib/stream_channel.dart b/pkgs/stream_channel/lib/stream_channel.dart
new file mode 100644
index 0000000..85f9a97
--- /dev/null
+++ b/pkgs/stream_channel/lib/stream_channel.dart
@@ -0,0 +1,181 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import 'src/close_guarantee_channel.dart';
+import 'src/guarantee_channel.dart';
+import 'src/stream_channel_transformer.dart';
+
+export 'src/delegating_stream_channel.dart';
+export 'src/disconnector.dart';
+export 'src/json_document_transformer.dart';
+export 'src/multi_channel.dart';
+export 'src/stream_channel_completer.dart';
+export 'src/stream_channel_controller.dart';
+export 'src/stream_channel_transformer.dart';
+
+/// An abstract class representing a two-way communication channel.
+///
+/// Users should consider the [stream] emitting a "done" event to be the
+/// canonical indicator that the channel has closed. If they wish to close the
+/// channel, they should close the [sink]—canceling the stream subscription is
+/// not sufficient. Protocol errors may be emitted through the stream or through
+/// [sink].done, depending on their underlying cause. Note that the sink may
+/// silently drop events if the channel closes before [sink].close is called.
+///
+/// Implementations are strongly encouraged to mix in or extend
+/// [StreamChannelMixin] to get default implementations of the various instance
+/// methods. Adding new methods to this interface will not be considered a
+/// breaking change if implementations are also added to [StreamChannelMixin].
+///
+/// Implementations must provide the following guarantees:
+///
+/// * The stream is single-subscription, and must follow all the guarantees of
+///   single-subscription streams.
+///
+/// * Closing the sink causes the stream to close before it emits any more
+///   events.
+///
+/// * After the stream closes, the sink is automatically closed. If this
+///   happens, sink methods should silently drop their arguments until
+///   [sink].close is called.
+///
+/// * If the stream closes before it has a listener, the sink should silently
+///   drop events if possible.
+///
+/// * Canceling the stream's subscription has no effect on the sink. The channel
+///   must still be able to respond to the other endpoint closing the channel
+///   even after the subscription has been canceled.
+///
+/// * The sink *either* forwards errors to the other endpoint *or* closes as
+///   soon as an error is added and forwards that error to the [sink].done
+///   future.
+///
+/// These guarantees allow users to interact uniformly with all implementations,
+/// and ensure that either endpoint closing the stream produces consistent
+/// behavior.
+abstract class StreamChannel<T> {
+  /// The single-subscription stream that emits values from the other endpoint.
+  Stream<T> get stream;
+
+  /// The sink for sending values to the other endpoint.
+  StreamSink<T> get sink;
+
+  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+  ///
+  /// Note that this stream/sink pair must provide the guarantees listed in the
+  /// [StreamChannel] documentation. If they don't do so natively,
+  /// [StreamChannel.withGuarantees] should be used instead.
+  factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
+      _StreamChannel<T>(stream, sink);
+
+  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+  ///
+  /// Unlike [StreamChannel.new], this enforces the guarantees listed in the
+  /// [StreamChannel] documentation. This makes it somewhat less efficient than
+  /// just wrapping a stream and a sink directly, so [StreamChannel.new] should
+  /// be used when the guarantees are provided natively.
+  ///
+  /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to
+  /// [sink]. If any are, the connection will close and the error will be
+  /// forwarded to [sink].done.
+  factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
+          {bool allowSinkErrors = true}) =>
+      GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
+
+  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+  ///
+  /// This specifically enforces the second guarantee: closing the sink causes
+  /// the stream to close before it emits any more events. This guarantee is
+  /// invalidated when an asynchronous gap is added between the original
+  /// stream's event dispatch and the returned stream's, for example by
+  /// transforming it with a [StreamTransformer]. This is a lighter-weight way
+  /// of preserving that guarantee in particular than
+  /// [StreamChannel.withGuarantees].
+  factory StreamChannel.withCloseGuarantee(
+          Stream<T> stream, StreamSink<T> sink) =>
+      CloseGuaranteeChannel(stream, sink);
+
+  /// Connects this to [other], so that any values emitted by either are sent
+  /// directly to the other.
+  void pipe(StreamChannel<T> other);
+
+  /// Transforms this using [transformer].
+  ///
+  /// This is identical to calling `transformer.bind(channel)`.
+  StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);
+
+  /// Transforms only the [stream] component of this using [transformer].
+  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
+
+  /// Transforms only the [sink] component of this using [transformer].
+  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
+
+  /// Returns a copy of this with [stream] replaced by [change]'s return
+  /// value.
+  StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change);
+
+  /// Returns a copy of this with [sink] replaced by [change]'s return
+  /// value.
+  StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change);
+
+  /// Returns a copy of this with the generic type coerced to [S].
+  ///
+  /// If any events emitted by [stream] aren't of type [S], they're converted
+  /// into [TypeError] events (`CastError` on some SDK versions). Similarly, if
+  /// any events are added to [sink] that aren't of type [S], a [TypeError] is
+  /// thrown.
+  StreamChannel<S> cast<S>();
+}
+
+/// An implementation of [StreamChannel] that simply takes a stream and a sink
+/// as parameters.
+///
+/// This is distinct from [StreamChannel] so that it can use
+/// [StreamChannelMixin].
+class _StreamChannel<T> extends StreamChannelMixin<T> {
+  @override
+  final Stream<T> stream;
+  @override
+  final StreamSink<T> sink;
+
+  _StreamChannel(this.stream, this.sink);
+}
+
+/// A mixin that implements the instance methods of [StreamChannel] in terms of
+/// [stream] and [sink].
+abstract class StreamChannelMixin<T> implements StreamChannel<T> {
+  @override
+  void pipe(StreamChannel<T> other) {
+    stream.pipe(other.sink);
+    other.stream.pipe(sink);
+  }
+
+  @override
+  StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) =>
+      transformer.bind(this);
+
+  @override
+  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
+      changeStream(transformer.bind);
+
+  @override
+  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
+      changeSink(transformer.bind);
+
+  @override
+  StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change) =>
+      StreamChannel.withCloseGuarantee(change(stream), sink);
+
+  @override
+  StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change) =>
+      StreamChannel.withCloseGuarantee(stream, change(sink));
+
+  @override
+  StreamChannel<S> cast<S>() => StreamChannel(
+      stream.cast(), StreamController(sync: true)..stream.cast<T>().pipe(sink));
+}
diff --git a/pkgs/stream_channel/pubspec.yaml b/pkgs/stream_channel/pubspec.yaml
new file mode 100644
index 0000000..eec8c1b
--- /dev/null
+++ b/pkgs/stream_channel/pubspec.yaml
@@ -0,0 +1,16 @@
+name: stream_channel
+version: 2.1.3
+description: >-
+  An abstraction for two-way communication channels based on the Dart Stream
+  class.
+repository: https://github.com/dart-lang/tools/tree/main/pkgs/stream_channel
+
+environment:
+  sdk: ^3.3.0
+
+dependencies:
+  async: ^2.5.0
+
+dev_dependencies:
+  dart_flutter_team_lints: ^3.0.0
+  test: ^1.16.6
diff --git a/pkgs/stream_channel/test/disconnector_test.dart b/pkgs/stream_channel/test/disconnector_test.dart
new file mode 100644
index 0000000..bbba568
--- /dev/null
+++ b/pkgs/stream_channel/test/disconnector_test.dart
@@ -0,0 +1,152 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late StreamController streamController;
+  late StreamController sinkController;
+  late Disconnector disconnector;
+  late StreamChannel channel;
+  setUp(() {
+    streamController = StreamController<void>();
+    sinkController = StreamController<void>();
+    disconnector = Disconnector();
+    channel = StreamChannel.withGuarantees(
+            streamController.stream, sinkController.sink)
+        .transform(disconnector);
+  });
+
+  group('before disconnection', () {
+    test('forwards events from the sink as normal', () {
+      channel.sink.add(1);
+      channel.sink.add(2);
+      channel.sink.add(3);
+      channel.sink.close();
+
+      expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test('forwards events to the stream as normal', () {
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      streamController.close();
+
+      expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test("events can't be added when the sink is explicitly closed", () {
+      sinkController.stream.listen(null); // Work around sdk#19095.
+
+      expect(channel.sink.close(), completes);
+      expect(() => channel.sink.add(1), throwsStateError);
+      expect(() => channel.sink.addError('oh no'), throwsStateError);
+      expect(() => channel.sink.addStream(Stream.fromIterable([])),
+          throwsStateError);
+    });
+
+    test("events can't be added while a stream is being added", () {
+      var controller = StreamController<void>();
+      channel.sink.addStream(controller.stream);
+
+      expect(() => channel.sink.add(1), throwsStateError);
+      expect(() => channel.sink.addError('oh no'), throwsStateError);
+      expect(() => channel.sink.addStream(Stream.fromIterable([])),
+          throwsStateError);
+      expect(() => channel.sink.close(), throwsStateError);
+
+      controller.close();
+    });
+  });
+
+  test('cancels addStream when disconnected', () async {
+    var canceled = false;
+    var controller = StreamController<void>(onCancel: () {
+      canceled = true;
+    });
+    expect(channel.sink.addStream(controller.stream), completes);
+    unawaited(disconnector.disconnect());
+
+    await pumpEventQueue();
+    expect(canceled, isTrue);
+  });
+
+  test('disconnect() returns the close future from the inner sink', () async {
+    var streamController = StreamController<void>();
+    var sinkController = StreamController<void>();
+    var disconnector = Disconnector<void>();
+    var sink = _CloseCompleterSink(sinkController.sink);
+    StreamChannel.withGuarantees(streamController.stream, sink)
+        .transform(disconnector);
+
+    var disconnectFutureFired = false;
+    expect(
+        disconnector.disconnect().then((_) {
+          disconnectFutureFired = true;
+        }),
+        completes);
+
+    // Give the future time to fire early if it's going to.
+    await pumpEventQueue();
+    expect(disconnectFutureFired, isFalse);
+
+    // When the inner sink's close future completes, so should the
+    // disconnector's.
+    sink.completer.complete();
+    await pumpEventQueue();
+    expect(disconnectFutureFired, isTrue);
+  });
+
+  group('after disconnection', () {
+    setUp(() {
+      disconnector.disconnect();
+    });
+
+    test('closes the inner sink and ignores events to the outer sink', () {
+      channel.sink.add(1);
+      channel.sink.add(2);
+      channel.sink.add(3);
+      channel.sink.close();
+
+      expect(sinkController.stream.toList(), completion(isEmpty));
+    });
+
+    test('closes the stream', () {
+      expect(channel.stream.toList(), completion(isEmpty));
+    });
+
+    test('completes done', () {
+      sinkController.stream.listen(null); // Work around sdk#19095.
+      expect(channel.sink.done, completes);
+    });
+
+    test('still emits state errors after explicit close', () {
+      sinkController.stream.listen(null); // Work around sdk#19095.
+      expect(channel.sink.close(), completes);
+
+      expect(() => channel.sink.add(1), throwsStateError);
+      expect(() => channel.sink.addError('oh no'), throwsStateError);
+    });
+  });
+}
+
+/// A [StreamSink] wrapper that adds the ability to manually complete the Future
+/// returned by [close] using [completer].
+class _CloseCompleterSink extends DelegatingStreamSink {
+  /// The completer for the future returned by [close].
+  final completer = Completer<void>();
+
+  _CloseCompleterSink(super.inner);
+
+  @override
+  Future<void> close() {
+    super.close();
+    return completer.future;
+  }
+}
diff --git a/pkgs/stream_channel/test/isolate_channel_test.dart b/pkgs/stream_channel/test/isolate_channel_test.dart
new file mode 100644
index 0000000..3a8b42e
--- /dev/null
+++ b/pkgs/stream_channel/test/isolate_channel_test.dart
@@ -0,0 +1,174 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@TestOn('vm')
+library;
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:stream_channel/isolate_channel.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late ReceivePort receivePort;
+  late SendPort sendPort;
+  late StreamChannel channel;
+  setUp(() {
+    receivePort = ReceivePort();
+    var receivePortForSend = ReceivePort();
+    sendPort = receivePortForSend.sendPort;
+    channel = IsolateChannel(receivePortForSend, receivePort.sendPort);
+  });
+
+  tearDown(() {
+    receivePort.close();
+    channel.sink.close();
+  });
+
+  test('the channel can send messages', () {
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+
+    expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+  });
+
+  test('the channel can receive messages', () {
+    sendPort.send(1);
+    sendPort.send(2);
+    sendPort.send(3);
+
+    expect(channel.stream.take(3).toList(), completion(equals([1, 2, 3])));
+  });
+
+  test("events can't be added to an explicitly-closed sink", () {
+    expect(channel.sink.close(), completes);
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError('oh no'), throwsStateError);
+    expect(() => channel.sink.addStream(Stream.fromIterable([])),
+        throwsStateError);
+  });
+
+  test("events can't be added while a stream is being added", () {
+    var controller = StreamController<void>();
+    channel.sink.addStream(controller.stream);
+
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError('oh no'), throwsStateError);
+    expect(() => channel.sink.addStream(Stream.fromIterable([])),
+        throwsStateError);
+    expect(() => channel.sink.close(), throwsStateError);
+
+    controller.close();
+  });
+
+  group('stream channel rules', () {
+    test(
+        'closing the sink causes the stream to close before it emits any more '
+        'events', () {
+      sendPort.send(1);
+      sendPort.send(2);
+      sendPort.send(3);
+      sendPort.send(4);
+      sendPort.send(5);
+
+      channel.stream.listen(expectAsync1((message) {
+        expect(message, equals(1));
+        channel.sink.close();
+      }, count: 1));
+    });
+
+    test("cancelling the stream's subscription has no effect on the sink",
+        () async {
+      unawaited(channel.stream.listen(null).cancel());
+      await pumpEventQueue();
+
+      channel.sink.add(1);
+      channel.sink.add(2);
+      channel.sink.add(3);
+      expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+    });
+
+    test('the sink closes as soon as an error is added', () async {
+      channel.sink.addError('oh no');
+      channel.sink.add(1);
+      expect(channel.sink.done, throwsA('oh no'));
+
+      // Since the sink is closed, the stream should also be closed.
+      expect(channel.stream.isEmpty, completion(isTrue));
+
+      // The other end shouldn't receive the next event, since the sink was
+      // closed. Pump the event queue to give it a chance to.
+      receivePort.listen(expectAsync1((_) {}, count: 0));
+      await pumpEventQueue();
+    });
+
+    test('the sink closes as soon as an error is added via addStream',
+        () async {
+      var canceled = false;
+      var controller = StreamController<void>(onCancel: () {
+        canceled = true;
+      });
+
+      // This future shouldn't get the error, because it's sent to [Sink.done].
+      expect(channel.sink.addStream(controller.stream), completes);
+
+      controller.addError('oh no');
+      expect(channel.sink.done, throwsA('oh no'));
+      await pumpEventQueue();
+      expect(canceled, isTrue);
+
+      // Even though the sink is closed, this shouldn't throw an error because
+      // the user didn't explicitly close it.
+      channel.sink.add(1);
+    });
+  });
+
+  group('connect constructors', () {
+    late ReceivePort connectPort;
+    setUp(() {
+      connectPort = ReceivePort();
+    });
+
+    tearDown(() {
+      connectPort.close();
+    });
+
+    test('create a connected pair of channels', () async {
+      var channel1 = IsolateChannel<int>.connectReceive(connectPort);
+      var channel2 = IsolateChannel<int>.connectSend(connectPort.sendPort);
+
+      channel1.sink.add(1);
+      channel1.sink.add(2);
+      channel1.sink.add(3);
+      expect(await channel2.stream.take(3).toList(), equals([1, 2, 3]));
+
+      channel2.sink.add(4);
+      channel2.sink.add(5);
+      channel2.sink.add(6);
+      expect(await channel1.stream.take(3).toList(), equals([4, 5, 6]));
+
+      await channel2.sink.close();
+    });
+
+    test('the receiving channel produces an error if it gets the wrong message',
+        () {
+      var connectedChannel = IsolateChannel<int>.connectReceive(connectPort);
+      connectPort.sendPort.send('wrong value');
+
+      expect(connectedChannel.stream.toList(), throwsStateError);
+      expect(connectedChannel.sink.done, completes);
+    });
+
+    test('the receiving channel closes gracefully without a connection',
+        () async {
+      var connectedChannel = IsolateChannel<int>.connectReceive(connectPort);
+      await connectedChannel.sink.close();
+      await expectLater(connectedChannel.stream.toList(), completion(isEmpty));
+      await expectLater(connectedChannel.sink.done, completes);
+    });
+  });
+}
diff --git a/pkgs/stream_channel/test/json_document_transformer_test.dart b/pkgs/stream_channel/test/json_document_transformer_test.dart
new file mode 100644
index 0000000..290c4e2
--- /dev/null
+++ b/pkgs/stream_channel/test/json_document_transformer_test.dart
@@ -0,0 +1,46 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late StreamController<String> streamController;
+  late StreamController<String> sinkController;
+  late StreamChannel<String> channel;
+  setUp(() {
+    streamController = StreamController<String>();
+    sinkController = StreamController<String>();
+    channel =
+        StreamChannel<String>(streamController.stream, sinkController.sink);
+  });
+
+  test('decodes JSON emitted by the channel', () {
+    var transformed = channel.transform(jsonDocument);
+    streamController.add('{"foo": "bar"}');
+    expect(transformed.stream.first, completion(equals({'foo': 'bar'})));
+  });
+
+  test('encodes objects added to the channel', () {
+    var transformed = channel.transform(jsonDocument);
+    transformed.sink.add({'foo': 'bar'});
+    expect(sinkController.stream.first,
+        completion(equals(jsonEncode({'foo': 'bar'}))));
+  });
+
+  test('emits a stream error when incoming JSON is malformed', () {
+    var transformed = channel.transform(jsonDocument);
+    streamController.add('{invalid');
+    expect(transformed.stream.first, throwsFormatException);
+  });
+
+  test('synchronously throws if an unencodable object is added', () {
+    var transformed = channel.transform(jsonDocument);
+    expect(() => transformed.sink.add(Object()),
+        throwsA(const TypeMatcher<JsonUnsupportedObjectError>()));
+  });
+}
diff --git a/pkgs/stream_channel/test/multi_channel_test.dart b/pkgs/stream_channel/test/multi_channel_test.dart
new file mode 100644
index 0000000..ee6f8d2
--- /dev/null
+++ b/pkgs/stream_channel/test/multi_channel_test.dart
@@ -0,0 +1,478 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late StreamChannelController controller;
+  late MultiChannel channel1;
+  late MultiChannel channel2;
+  setUp(() {
+    controller = StreamChannelController();
+    channel1 = MultiChannel<int>(controller.local);
+    channel2 = MultiChannel<int>(controller.foreign);
+  });
+
+  group('the default virtual channel', () {
+    test('begins connected', () {
+      var first = true;
+      channel2.stream.listen(expectAsync1((message) {
+        if (first) {
+          expect(message, equals(1));
+          first = false;
+        } else {
+          expect(message, equals(2));
+        }
+      }, count: 2));
+
+      channel1.sink.add(1);
+      channel1.sink.add(2);
+    });
+
+    test('closes the remote virtual channel when it closes', () {
+      expect(channel2.stream.toList(), completion(isEmpty));
+      expect(channel2.sink.done, completes);
+
+      channel1.sink.close();
+    });
+
+    test('closes the local virtual channel when it closes', () {
+      expect(channel1.stream.toList(), completion(isEmpty));
+      expect(channel1.sink.done, completes);
+
+      channel1.sink.close();
+    });
+
+    test(
+        "doesn't closes the local virtual channel when the stream "
+        'subscription is canceled', () {
+      channel1.sink.done.then(expectAsync1((_) {}, count: 0));
+
+      channel1.stream.listen((_) {}).cancel();
+
+      // Ensure that there's enough time for the channel to close if it's going
+      // to.
+      return pumpEventQueue();
+    });
+
+    test(
+        'closes the underlying channel when it closes without any other '
+        'virtual channels', () {
+      expect(controller.local.sink.done, completes);
+      expect(controller.foreign.sink.done, completes);
+
+      channel1.sink.close();
+    });
+
+    test(
+        "doesn't close the underlying channel when it closes with other "
+        'virtual channels', () {
+      controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
+      controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
+
+      // Establish another virtual connection which should keep the underlying
+      // connection open.
+      channel2.virtualChannel(channel1.virtualChannel().id);
+      channel1.sink.close();
+
+      // Ensure that there's enough time for the underlying channel to complete
+      // if it's going to.
+      return pumpEventQueue();
+    });
+  });
+
+  group('a locally-created virtual channel', () {
+    late VirtualChannel virtual1;
+    late VirtualChannel virtual2;
+    setUp(() {
+      virtual1 = channel1.virtualChannel();
+      virtual2 = channel2.virtualChannel(virtual1.id);
+    });
+
+    test('sends messages only to the other virtual channel', () {
+      var first = true;
+      virtual2.stream.listen(expectAsync1((message) {
+        if (first) {
+          expect(message, equals(1));
+          first = false;
+        } else {
+          expect(message, equals(2));
+        }
+      }, count: 2));
+
+      // No other virtual channels should receive the message.
+      for (var i = 0; i < 10; i++) {
+        var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
+        virtual.stream.listen(expectAsync1((_) {}, count: 0));
+      }
+      channel2.stream.listen(expectAsync1((_) {}, count: 0));
+
+      virtual1.sink.add(1);
+      virtual1.sink.add(2);
+    });
+
+    test('closes the remote virtual channel when it closes', () {
+      expect(virtual2.stream.toList(), completion(isEmpty));
+      expect(virtual2.sink.done, completes);
+
+      virtual1.sink.close();
+    });
+
+    test('closes the local virtual channel when it closes', () {
+      expect(virtual1.stream.toList(), completion(isEmpty));
+      expect(virtual1.sink.done, completes);
+
+      virtual1.sink.close();
+    });
+
+    test(
+        "doesn't closes the local virtual channel when the stream "
+        'subscription is canceled', () {
+      virtual1.sink.done.then(expectAsync1((_) {}, count: 0));
+      virtual1.stream.listen((_) {}).cancel();
+
+      // Ensure that there's enough time for the channel to close if it's going
+      // to.
+      return pumpEventQueue();
+    });
+
+    test(
+        'closes the underlying channel when it closes without any other '
+        'virtual channels', () async {
+      // First close the default channel so we can test the new channel as the
+      // last living virtual channel.
+      unawaited(channel1.sink.close());
+
+      await channel2.stream.toList();
+      expect(controller.local.sink.done, completes);
+      expect(controller.foreign.sink.done, completes);
+
+      unawaited(virtual1.sink.close());
+    });
+
+    test(
+        "doesn't close the underlying channel when it closes with other "
+        'virtual channels', () {
+      controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
+      controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
+
+      virtual1.sink.close();
+
+      // Ensure that there's enough time for the underlying channel to complete
+      // if it's going to.
+      return pumpEventQueue();
+    });
+
+    test("doesn't conflict with a remote virtual channel", () {
+      var virtual3 = channel2.virtualChannel();
+      var virtual4 = channel1.virtualChannel(virtual3.id);
+
+      // This is an implementation detail, but we assert it here to make sure
+      // we're properly testing two channels with the same id.
+      expect(virtual1.id, equals(virtual3.id));
+
+      virtual2.stream
+          .listen(expectAsync1((message) => expect(message, equals(1))));
+      virtual4.stream
+          .listen(expectAsync1((message) => expect(message, equals(2))));
+
+      virtual1.sink.add(1);
+      virtual3.sink.add(2);
+    });
+  });
+
+  group('a remotely-created virtual channel', () {
+    late VirtualChannel virtual1;
+    late VirtualChannel virtual2;
+    setUp(() {
+      virtual1 = channel1.virtualChannel();
+      virtual2 = channel2.virtualChannel(virtual1.id);
+    });
+
+    test('sends messages only to the other virtual channel', () {
+      var first = true;
+      virtual1.stream.listen(expectAsync1((message) {
+        if (first) {
+          expect(message, equals(1));
+          first = false;
+        } else {
+          expect(message, equals(2));
+        }
+      }, count: 2));
+
+      // No other virtual channels should receive the message.
+      for (var i = 0; i < 10; i++) {
+        var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
+        virtual.stream.listen(expectAsync1((_) {}, count: 0));
+      }
+      channel1.stream.listen(expectAsync1((_) {}, count: 0));
+
+      virtual2.sink.add(1);
+      virtual2.sink.add(2);
+    });
+
+    test('closes the remote virtual channel when it closes', () {
+      expect(virtual1.stream.toList(), completion(isEmpty));
+      expect(virtual1.sink.done, completes);
+
+      virtual2.sink.close();
+    });
+
+    test('closes the local virtual channel when it closes', () {
+      expect(virtual2.stream.toList(), completion(isEmpty));
+      expect(virtual2.sink.done, completes);
+
+      virtual2.sink.close();
+    });
+
+    test(
+        "doesn't closes the local virtual channel when the stream "
+        'subscription is canceled', () {
+      virtual2.sink.done.then(expectAsync1((_) {}, count: 0));
+      virtual2.stream.listen((_) {}).cancel();
+
+      // Ensure that there's enough time for the channel to close if it's going
+      // to.
+      return pumpEventQueue();
+    });
+
+    test(
+        'closes the underlying channel when it closes without any other '
+        'virtual channels', () async {
+      // First close the default channel so we can test the new channel as the
+      // last living virtual channel.
+      unawaited(channel2.sink.close());
+
+      await channel1.stream.toList();
+      expect(controller.local.sink.done, completes);
+      expect(controller.foreign.sink.done, completes);
+
+      unawaited(virtual2.sink.close());
+    });
+
+    test(
+        "doesn't close the underlying channel when it closes with other "
+        'virtual channels', () {
+      controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
+      controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
+
+      virtual2.sink.close();
+
+      // Ensure that there's enough time for the underlying channel to complete
+      // if it's going to.
+      return pumpEventQueue();
+    });
+
+    test("doesn't allow another virtual channel with the same id", () {
+      expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError);
+    });
+
+    test('dispatches events received before the virtual channel is created',
+        () async {
+      virtual1 = channel1.virtualChannel();
+
+      virtual1.sink.add(1);
+      await pumpEventQueue();
+
+      virtual1.sink.add(2);
+      await pumpEventQueue();
+
+      expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2]));
+    });
+
+    test(
+        'dispatches close events received before the virtual channel is '
+        'created', () async {
+      virtual1 = channel1.virtualChannel();
+
+      unawaited(virtual1.sink.close());
+      await pumpEventQueue();
+
+      expect(channel2.virtualChannel(virtual1.id).stream.toList(),
+          completion(isEmpty));
+    });
+  });
+
+  group('when the underlying stream', () {
+    late VirtualChannel virtual1;
+    late VirtualChannel virtual2;
+    setUp(() {
+      virtual1 = channel1.virtualChannel();
+      virtual2 = channel2.virtualChannel(virtual1.id);
+    });
+
+    test('closes, all virtual channels close', () {
+      expect(channel1.stream.toList(), completion(isEmpty));
+      expect(channel1.sink.done, completes);
+      expect(channel2.stream.toList(), completion(isEmpty));
+      expect(channel2.sink.done, completes);
+      expect(virtual1.stream.toList(), completion(isEmpty));
+      expect(virtual1.sink.done, completes);
+      expect(virtual2.stream.toList(), completion(isEmpty));
+      expect(virtual2.sink.done, completes);
+
+      controller.local.sink.close();
+    });
+
+    test('closes, more virtual channels are created closed', () async {
+      unawaited(channel2.sink.close());
+      unawaited(virtual2.sink.close());
+
+      // Wait for the existing channels to emit done events.
+      await channel1.stream.toList();
+      await virtual1.stream.toList();
+
+      var virtual = channel1.virtualChannel();
+      expect(virtual.stream.toList(), completion(isEmpty));
+      expect(virtual.sink.done, completes);
+
+      virtual = channel1.virtualChannel();
+      expect(virtual.stream.toList(), completion(isEmpty));
+      expect(virtual.sink.done, completes);
+    });
+
+    test('emits an error, the error is sent only to the default channel', () {
+      channel1.stream.listen(expectAsync1((_) {}, count: 0),
+          onError: expectAsync1((error) => expect(error, equals('oh no'))));
+      virtual1.stream.listen(expectAsync1((_) {}, count: 0),
+          onError: expectAsync1((_) {}, count: 0));
+
+      controller.foreign.sink.addError('oh no');
+    });
+  });
+
+  group('stream channel rules', () {
+    group('for the main stream:', () {
+      test(
+          'closing the sink causes the stream to close before it emits any '
+          'more events', () {
+        channel1.sink.add(1);
+        channel1.sink.add(2);
+        channel1.sink.add(3);
+
+        channel2.stream.listen(expectAsync1((message) {
+          expect(message, equals(1));
+          channel2.sink.close();
+        }, count: 1));
+      });
+
+      test('after the stream closes, the sink ignores events', () async {
+        unawaited(channel1.sink.close());
+
+        // Wait for the done event to be delivered.
+        await channel2.stream.toList();
+        channel2.sink.add(1);
+        channel2.sink.add(2);
+        channel2.sink.add(3);
+        unawaited(channel2.sink.close());
+
+        // None of our channel.sink additions should make it to the other
+        // endpoint.
+        channel1.stream.listen(expectAsync1((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+
+      test("canceling the stream's subscription has no effect on the sink",
+          () async {
+        unawaited(channel1.stream.listen(null).cancel());
+        await pumpEventQueue();
+
+        channel1.sink.add(1);
+        channel1.sink.add(2);
+        channel1.sink.add(3);
+        unawaited(channel1.sink.close());
+        expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
+      });
+
+      test("canceling the stream's subscription doesn't stop a done event",
+          () async {
+        unawaited(channel1.stream.listen(null).cancel());
+        await pumpEventQueue();
+
+        unawaited(channel2.sink.close());
+        await pumpEventQueue();
+
+        channel1.sink.add(1);
+        channel1.sink.add(2);
+        channel1.sink.add(3);
+        unawaited(channel1.sink.close());
+
+        // The sink should be ignoring events because the channel closed.
+        channel2.stream.listen(expectAsync1((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+    });
+
+    group('for a virtual channel:', () {
+      late VirtualChannel virtual1;
+      late VirtualChannel virtual2;
+      setUp(() {
+        virtual1 = channel1.virtualChannel();
+        virtual2 = channel2.virtualChannel(virtual1.id);
+      });
+
+      test(
+          'closing the sink causes the stream to close before it emits any '
+          'more events', () {
+        virtual1.sink.add(1);
+        virtual1.sink.add(2);
+        virtual1.sink.add(3);
+
+        virtual2.stream.listen(expectAsync1((message) {
+          expect(message, equals(1));
+          virtual2.sink.close();
+        }, count: 1));
+      });
+
+      test('after the stream closes, the sink ignores events', () async {
+        unawaited(virtual1.sink.close());
+
+        // Wait for the done event to be delivered.
+        await virtual2.stream.toList();
+        virtual2.sink.add(1);
+        virtual2.sink.add(2);
+        virtual2.sink.add(3);
+        unawaited(virtual2.sink.close());
+
+        // None of our virtual.sink additions should make it to the other
+        // endpoint.
+        virtual1.stream.listen(expectAsync1((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+
+      test("canceling the stream's subscription has no effect on the sink",
+          () async {
+        unawaited(virtual1.stream.listen(null).cancel());
+        await pumpEventQueue();
+
+        virtual1.sink.add(1);
+        virtual1.sink.add(2);
+        virtual1.sink.add(3);
+        unawaited(virtual1.sink.close());
+        expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
+      });
+
+      test("canceling the stream's subscription doesn't stop a done event",
+          () async {
+        unawaited(virtual1.stream.listen(null).cancel());
+        await pumpEventQueue();
+
+        unawaited(virtual2.sink.close());
+        await pumpEventQueue();
+
+        virtual1.sink.add(1);
+        virtual1.sink.add(2);
+        virtual1.sink.add(3);
+        unawaited(virtual1.sink.close());
+
+        // The sink should be ignoring events because the stream closed.
+        virtual2.stream.listen(expectAsync1((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+    });
+  });
+}
diff --git a/pkgs/stream_channel/test/stream_channel_completer_test.dart b/pkgs/stream_channel/test/stream_channel_completer_test.dart
new file mode 100644
index 0000000..c6fddc0
--- /dev/null
+++ b/pkgs/stream_channel/test/stream_channel_completer_test.dart
@@ -0,0 +1,120 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late StreamChannelCompleter completer;
+  late StreamController streamController;
+  late StreamController sinkController;
+  late StreamChannel innerChannel;
+  setUp(() {
+    completer = StreamChannelCompleter();
+    streamController = StreamController<void>();
+    sinkController = StreamController<void>();
+    innerChannel = StreamChannel(streamController.stream, sinkController.sink);
+  });
+
+  group('when a channel is set before accessing', () {
+    test('forwards events through the stream', () {
+      completer.setChannel(innerChannel);
+      expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      streamController.close();
+    });
+
+    test('forwards events through the sink', () {
+      completer.setChannel(innerChannel);
+      expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+
+      completer.channel.sink.add(1);
+      completer.channel.sink.add(2);
+      completer.channel.sink.add(3);
+      completer.channel.sink.close();
+    });
+
+    test('forwards an error through the stream', () {
+      completer.setError('oh no');
+      expect(completer.channel.stream.first, throwsA('oh no'));
+    });
+
+    test('drops sink events', () {
+      completer.setError('oh no');
+      expect(completer.channel.sink.done, completes);
+      completer.channel.sink.add(1);
+      completer.channel.sink.addError('oh no');
+    });
+  });
+
+  group('when a channel is set after accessing', () {
+    test('forwards events through the stream', () async {
+      expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+      await pumpEventQueue();
+
+      completer.setChannel(innerChannel);
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      unawaited(streamController.close());
+    });
+
+    test('forwards events through the sink', () async {
+      completer.channel.sink.add(1);
+      completer.channel.sink.add(2);
+      completer.channel.sink.add(3);
+      unawaited(completer.channel.sink.close());
+      await pumpEventQueue();
+
+      completer.setChannel(innerChannel);
+      expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test('forwards an error through the stream', () async {
+      expect(completer.channel.stream.first, throwsA('oh no'));
+      await pumpEventQueue();
+
+      completer.setError('oh no');
+    });
+
+    test('drops sink events', () async {
+      expect(completer.channel.sink.done, completes);
+      completer.channel.sink.add(1);
+      completer.channel.sink.addError('oh no');
+      await pumpEventQueue();
+
+      completer.setError('oh no');
+    });
+  });
+
+  group('forFuture', () {
+    test('forwards a StreamChannel', () {
+      var channel =
+          StreamChannelCompleter.fromFuture(Future.value(innerChannel));
+      channel.sink.add(1);
+      channel.sink.close();
+      streamController.sink.add(2);
+      streamController.sink.close();
+
+      expect(sinkController.stream.toList(), completion(equals([1])));
+      expect(channel.stream.toList(), completion(equals([2])));
+    });
+
+    test('forwards an error', () {
+      var channel = StreamChannelCompleter.fromFuture(Future.error('oh no'));
+      expect(channel.stream.toList(), throwsA('oh no'));
+    });
+  });
+
+  test("doesn't allow the channel to be set multiple times", () {
+    completer.setChannel(innerChannel);
+    expect(() => completer.setChannel(innerChannel), throwsStateError);
+    expect(() => completer.setChannel(innerChannel), throwsStateError);
+  });
+}
diff --git a/pkgs/stream_channel/test/stream_channel_controller_test.dart b/pkgs/stream_channel/test/stream_channel_controller_test.dart
new file mode 100644
index 0000000..3d661e3
--- /dev/null
+++ b/pkgs/stream_channel/test/stream_channel_controller_test.dart
@@ -0,0 +1,104 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  group('asynchronously', () {
+    late StreamChannelController controller;
+    setUp(() {
+      controller = StreamChannelController();
+    });
+
+    test('forwards events from the local sink to the foreign stream', () {
+      controller.local.sink
+        ..add(1)
+        ..add(2)
+        ..add(3)
+        ..close();
+      expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test('forwards events from the foreign sink to the local stream', () {
+      controller.foreign.sink
+        ..add(1)
+        ..add(2)
+        ..add(3)
+        ..close();
+      expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test(
+        'with allowForeignErrors: false, shuts down the connection if an '
+        'error is added to the foreign channel', () {
+      controller = StreamChannelController(allowForeignErrors: false);
+
+      controller.foreign.sink.addError('oh no');
+      expect(controller.foreign.sink.done, throwsA('oh no'));
+      expect(controller.foreign.stream.toList(), completion(isEmpty));
+      expect(controller.local.sink.done, completes);
+      expect(controller.local.stream.toList(), completion(isEmpty));
+    });
+  });
+
+  group('synchronously', () {
+    late StreamChannelController controller;
+    setUp(() {
+      controller = StreamChannelController(sync: true);
+    });
+
+    test(
+        'synchronously forwards events from the local sink to the foreign '
+        'stream', () {
+      var receivedEvent = false;
+      var receivedError = false;
+      var receivedDone = false;
+      controller.foreign.stream.listen(expectAsync1((event) {
+        expect(event, equals(1));
+        receivedEvent = true;
+      }), onError: expectAsync1((error) {
+        expect(error, equals('oh no'));
+        receivedError = true;
+      }), onDone: expectAsync0(() {
+        receivedDone = true;
+      }));
+
+      controller.local.sink.add(1);
+      expect(receivedEvent, isTrue);
+
+      controller.local.sink.addError('oh no');
+      expect(receivedError, isTrue);
+
+      controller.local.sink.close();
+      expect(receivedDone, isTrue);
+    });
+
+    test(
+        'synchronously forwards events from the foreign sink to the local '
+        'stream', () {
+      var receivedEvent = false;
+      var receivedError = false;
+      var receivedDone = false;
+      controller.local.stream.listen(expectAsync1((event) {
+        expect(event, equals(1));
+        receivedEvent = true;
+      }), onError: expectAsync1((error) {
+        expect(error, equals('oh no'));
+        receivedError = true;
+      }), onDone: expectAsync0(() {
+        receivedDone = true;
+      }));
+
+      controller.foreign.sink.add(1);
+      expect(receivedEvent, isTrue);
+
+      controller.foreign.sink.addError('oh no');
+      expect(receivedError, isTrue);
+
+      controller.foreign.sink.close();
+      expect(receivedDone, isTrue);
+    });
+  });
+}
diff --git a/pkgs/stream_channel/test/stream_channel_test.dart b/pkgs/stream_channel/test/stream_channel_test.dart
new file mode 100644
index 0000000..c44b6ab
--- /dev/null
+++ b/pkgs/stream_channel/test/stream_channel_test.dart
@@ -0,0 +1,138 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  test("pipe() pipes data from each channel's stream into the other's sink",
+      () {
+    var otherStreamController = StreamController<int>();
+    var otherSinkController = StreamController<int>();
+    var otherChannel =
+        StreamChannel(otherStreamController.stream, otherSinkController.sink);
+
+    var streamController = StreamController<int>();
+    var sinkController = StreamController<int>();
+    var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+    channel.pipe(otherChannel);
+
+    streamController.add(1);
+    streamController.add(2);
+    streamController.add(3);
+    streamController.close();
+    expect(otherSinkController.stream.toList(), completion(equals([1, 2, 3])));
+
+    otherStreamController.add(4);
+    otherStreamController.add(5);
+    otherStreamController.add(6);
+    otherStreamController.close();
+    expect(sinkController.stream.toList(), completion(equals([4, 5, 6])));
+  });
+
+  test('transform() transforms the channel', () async {
+    var streamController = StreamController<List<int>>();
+    var sinkController = StreamController<List<int>>();
+    var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+    var transformed = channel
+        .cast<List<int>>()
+        .transform(StreamChannelTransformer.fromCodec(utf8));
+
+    streamController.add([102, 111, 111, 98, 97, 114]);
+    unawaited(streamController.close());
+    expect(await transformed.stream.toList(), equals(['foobar']));
+
+    transformed.sink.add('fblthp');
+    unawaited(transformed.sink.close());
+    expect(
+        sinkController.stream.toList(),
+        completion(equals([
+          [102, 98, 108, 116, 104, 112]
+        ])));
+  });
+
+  test('transformStream() transforms only the stream', () async {
+    var streamController = StreamController<String>();
+    var sinkController = StreamController<String>();
+    var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+    var transformed =
+        channel.cast<String>().transformStream(const LineSplitter());
+
+    streamController.add('hello world');
+    streamController.add(' what\nis');
+    streamController.add('\nup');
+    unawaited(streamController.close());
+    expect(await transformed.stream.toList(),
+        equals(['hello world what', 'is', 'up']));
+
+    transformed.sink.add('fbl\nthp');
+    unawaited(transformed.sink.close());
+    expect(sinkController.stream.toList(), completion(equals(['fbl\nthp'])));
+  });
+
+  test('transformSink() transforms only the sink', () async {
+    var streamController = StreamController<String>();
+    var sinkController = StreamController<String>();
+    var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+    var transformed = channel.cast<String>().transformSink(
+        const StreamSinkTransformer.fromStreamTransformer(LineSplitter()));
+
+    streamController.add('fbl\nthp');
+    unawaited(streamController.close());
+    expect(await transformed.stream.toList(), equals(['fbl\nthp']));
+
+    transformed.sink.add('hello world');
+    transformed.sink.add(' what\nis');
+    transformed.sink.add('\nup');
+    unawaited(transformed.sink.close());
+    expect(sinkController.stream.toList(),
+        completion(equals(['hello world what', 'is', 'up'])));
+  });
+
+  test('changeStream() changes the stream', () {
+    var streamController = StreamController<int>();
+    var sinkController = StreamController<int>();
+    var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+    var newController = StreamController<int>();
+    var changed = channel.changeStream((stream) {
+      expect(stream, equals(channel.stream));
+      return newController.stream;
+    });
+
+    newController.add(10);
+    newController.close();
+
+    streamController.add(20);
+    streamController.close();
+
+    expect(changed.stream.toList(), completion(equals([10])));
+  });
+
+  test('changeSink() changes the sink', () {
+    var streamController = StreamController<int>();
+    var sinkController = StreamController<int>();
+    var channel = StreamChannel(streamController.stream, sinkController.sink);
+
+    var newController = StreamController<int>();
+    var changed = channel.changeSink((sink) {
+      expect(sink, equals(channel.sink));
+      return newController.sink;
+    });
+
+    expect(newController.stream.toList(), completion(equals([10])));
+    streamController.stream.listen(expectAsync1((_) {}, count: 0));
+
+    changed.sink.add(10);
+    changed.sink.close();
+  });
+}
diff --git a/pkgs/stream_channel/test/with_close_guarantee_test.dart b/pkgs/stream_channel/test/with_close_guarantee_test.dart
new file mode 100644
index 0000000..9c0b729
--- /dev/null
+++ b/pkgs/stream_channel/test/with_close_guarantee_test.dart
@@ -0,0 +1,69 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+final _delayTransformer = StreamTransformer.fromHandlers(
+    handleData: (data, sink) => Future.microtask(() => sink.add(data)),
+    handleDone: (sink) => Future.microtask(() => sink.close()));
+
+final _delaySinkTransformer =
+    StreamSinkTransformer.fromStreamTransformer(_delayTransformer);
+
+void main() {
+  late StreamChannelController controller;
+  late StreamChannel channel;
+  setUp(() {
+    controller = StreamChannelController();
+
+    // Add a bunch of layers of asynchronous dispatch between the channel and
+    // the underlying controllers.
+    var stream = controller.foreign.stream;
+    var sink = controller.foreign.sink;
+    for (var i = 0; i < 10; i++) {
+      stream = stream.transform(_delayTransformer);
+      sink = _delaySinkTransformer.bind(sink);
+    }
+
+    channel = StreamChannel.withCloseGuarantee(stream, sink);
+  });
+
+  test(
+      'closing the event sink causes the stream to close before it emits any '
+      'more events', () async {
+    controller.local.sink.add(1);
+    controller.local.sink.add(2);
+    controller.local.sink.add(3);
+
+    expect(
+        channel.stream
+            .listen(expectAsync1((event) {
+              if (event == 2) channel.sink.close();
+            }, count: 2))
+            .asFuture<void>(),
+        completes);
+
+    await pumpEventQueue();
+  });
+
+  test(
+      'closing the event sink before events are emitted causes the stream to '
+      'close immediately', () async {
+    unawaited(channel.sink.close());
+    channel.stream.listen(expectAsync1((_) {}, count: 0),
+        onError: expectAsync2((_, __) {}, count: 0),
+        onDone: expectAsync0(() {}));
+
+    controller.local.sink.add(1);
+    controller.local.sink.add(2);
+    controller.local.sink.add(3);
+    unawaited(controller.local.sink.close());
+
+    await pumpEventQueue();
+  });
+}
diff --git a/pkgs/stream_channel/test/with_guarantees_test.dart b/pkgs/stream_channel/test/with_guarantees_test.dart
new file mode 100644
index 0000000..f026079
--- /dev/null
+++ b/pkgs/stream_channel/test/with_guarantees_test.dart
@@ -0,0 +1,200 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late StreamController streamController;
+  late StreamController sinkController;
+  late StreamChannel channel;
+  setUp(() {
+    streamController = StreamController<void>();
+    sinkController = StreamController<void>();
+    channel = StreamChannel.withGuarantees(
+        streamController.stream, sinkController.sink);
+  });
+
+  group('with a broadcast stream', () {
+    setUp(() {
+      streamController = StreamController.broadcast();
+      channel = StreamChannel.withGuarantees(
+          streamController.stream, sinkController.sink);
+    });
+
+    test('buffers events', () async {
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      await pumpEventQueue();
+
+      expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+      unawaited(streamController.close());
+    });
+
+    test('only allows a single subscription', () {
+      channel.stream.listen(null);
+      expect(() => channel.stream.listen(null), throwsStateError);
+    });
+  });
+
+  test(
+      'closing the event sink causes the stream to close before it emits any '
+      'more events', () {
+    streamController.add(1);
+    streamController.add(2);
+    streamController.add(3);
+
+    expect(
+        channel.stream
+            .listen(expectAsync1((event) {
+              if (event == 2) channel.sink.close();
+            }, count: 2))
+            .asFuture<void>(),
+        completes);
+  });
+
+  test('after the stream closes, the sink ignores events', () async {
+    unawaited(streamController.close());
+
+    // Wait for the done event to be delivered.
+    await channel.stream.toList();
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+    unawaited(channel.sink.close());
+
+    // None of our channel.sink additions should make it to the other endpoint.
+    sinkController.stream.listen(expectAsync1((_) {}, count: 0),
+        onDone: expectAsync0(() {}, count: 0));
+    await pumpEventQueue();
+  });
+
+  test("canceling the stream's subscription has no effect on the sink",
+      () async {
+    unawaited(channel.stream.listen(null).cancel());
+    await pumpEventQueue();
+
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+    unawaited(channel.sink.close());
+    expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+  });
+
+  test("canceling the stream's subscription doesn't stop a done event",
+      () async {
+    unawaited(channel.stream.listen(null).cancel());
+    await pumpEventQueue();
+
+    unawaited(streamController.close());
+    await pumpEventQueue();
+
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+    unawaited(channel.sink.close());
+
+    // The sink should be ignoring events because the stream closed.
+    sinkController.stream.listen(expectAsync1((_) {}, count: 0),
+        onDone: expectAsync0(() {}, count: 0));
+    await pumpEventQueue();
+  });
+
+  test('forwards errors to the other endpoint', () {
+    channel.sink.addError('error');
+    expect(sinkController.stream.first, throwsA('error'));
+  });
+
+  test('Sink.done completes once the stream is done', () {
+    channel.stream.listen(null);
+    expect(channel.sink.done, completes);
+    streamController.close();
+  });
+
+  test("events can't be added to an explicitly-closed sink", () {
+    sinkController.stream.listen(null); // Work around sdk#19095.
+
+    expect(channel.sink.close(), completes);
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError('oh no'), throwsStateError);
+    expect(() => channel.sink.addStream(Stream.fromIterable([])),
+        throwsStateError);
+  });
+
+  test("events can't be added while a stream is being added", () {
+    var controller = StreamController<void>();
+    channel.sink.addStream(controller.stream);
+
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError('oh no'), throwsStateError);
+    expect(() => channel.sink.addStream(Stream.fromIterable([])),
+        throwsStateError);
+    expect(() => channel.sink.close(), throwsStateError);
+
+    controller.close();
+  });
+
+  group('with allowSinkErrors: false', () {
+    setUp(() {
+      streamController = StreamController<void>();
+      sinkController = StreamController<void>();
+      channel = StreamChannel.withGuarantees(
+          streamController.stream, sinkController.sink,
+          allowSinkErrors: false);
+    });
+
+    test('forwards errors to Sink.done but not the stream', () {
+      channel.sink.addError('oh no');
+      expect(channel.sink.done, throwsA('oh no'));
+      sinkController.stream
+          .listen(null, onError: expectAsync1((dynamic _) {}, count: 0));
+    });
+
+    test('adding an error causes the stream to emit a done event', () {
+      expect(channel.sink.done, throwsA('oh no'));
+
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+
+      expect(
+          channel.stream
+              .listen(expectAsync1((event) {
+                if (event == 2) channel.sink.addError('oh no');
+              }, count: 2))
+              .asFuture<void>(),
+          completes);
+    });
+
+    test('adding an error closes the inner sink', () {
+      channel.sink.addError('oh no');
+      expect(channel.sink.done, throwsA('oh no'));
+      expect(sinkController.stream.toList(), completion(isEmpty));
+    });
+
+    test(
+        'adding an error via via addStream causes the stream to emit a done '
+        'event', () async {
+      var canceled = false;
+      var controller = StreamController<void>(onCancel: () {
+        canceled = true;
+      });
+
+      // This future shouldn't get the error, because it's sent to [Sink.done].
+      expect(channel.sink.addStream(controller.stream), completes);
+
+      controller.addError('oh no');
+      expect(channel.sink.done, throwsA('oh no'));
+      await pumpEventQueue();
+      expect(canceled, isTrue);
+
+      // Even though the sink is closed, this shouldn't throw an error because
+      // the user didn't explicitly close it.
+      channel.sink.add(1);
+    });
+  });
+}