blob: 32698030917bba650bfd54a170621474f3a51786 [file] [log] [blame]
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Regression test for https://dartbug.com/55886: [HttpResponse.addStream]
// should cancel subscription to the stream which is being added if
// [HttpResponse] itself is being closed.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:async_helper/async_helper.dart';
import 'package:expect/expect.dart';
Future<void> pipeStream(Stream<List<int>> from, IOSink to) async {
bool wasCancelled = false;
StreamSubscription<List<int>>? subscription;
late final StreamController<List<int>> streamController;
streamController = StreamController<List<int>>(
onPause: () {
subscription?.pause();
},
onResume: () {
subscription?.resume();
},
onCancel: () {
wasCancelled = true;
subscription?.cancel();
subscription = null;
},
onListen: () {
subscription = from.listen(
(data) {
streamController.add(data);
},
onDone: () {
streamController.close();
subscription?.cancel();
subscription = null;
},
onError: (e, st) {
streamController.addError(e, st);
subscription?.cancel();
subscription = null;
},
);
},
);
await streamController.stream.pipe(to);
Expect.isTrue(wasCancelled);
}
Stream<List<int>> generateSlowly() async* {
for (var i = 0; i < 100; i++) {
yield utf8.encode("item $i");
await Future.delayed(Duration(milliseconds: 100));
}
}
Future<void> serve(HttpServer server) async {
await for (var rq in server) {
rq.response.bufferOutput = false;
await pipeStream(generateSlowly(), rq.response);
break;
}
}
void main() async {
asyncStart();
final server = await HttpServer.bind('localhost', 0);
serve(server).then((_) => asyncEnd());
// Send request and then cancel response stream subscription after
// the first chunk. This should cause server to close the connection
// and cancel subscription to the stream which is being piped into
// the response.
final client = HttpClient();
final rq = await client.get('localhost', server.port, '/');
final rs = await rq.close();
late StreamSubscription sub;
sub = rs.map(utf8.decode).listen((msg) {
sub.cancel();
});
}