Reland "[dart:io] Add Abort() on HttpClientRequest"

The test was poorly written. The response from Socket can arrive
separately. So the check for content-length header will fail.

This is a reland of 4b96f20a79a6eca074dc17b8b84448f79d4acfc8

Original change's description:
> [dart:io] Add Abort() on HttpClientRequest
>
> The breaking change request for this cl: https://github.com/dart-lang/sdk/issues/41904
>
> Bug: https://github.com/dart-lang/sdk/issues/22265
> Change-Id: I36db64b4db307b78cd188a2f1701ec733f2e73db
> Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/147339
> Commit-Queue: Zichang Guo <zichangguo@google.com>
> Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>

Bug: https://github.com/dart-lang/sdk/issues/22265
Change-Id: Ibfe9565a3f9d5ef84274fba33a68fb57dbbe28c9
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/155581
Reviewed-by: Siva Annamalai <asiva@google.com>
Commit-Queue: Zichang Guo <zichangguo@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 04aa7bd..4aa79ce 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,11 @@
 
 ### Core libraries
 
+#### `dart:io`
+
+*   Adds `Abort` method to class `HttpClientRequest`, which allows users
+    to cancel outgoing HTTP requests and stop following IO operations.
+
 #### `dart:typed_data`
 
 *   Class `BytesBuilder` is moved from `dart:io` to `dart:typed_data`.
diff --git a/sdk/lib/_http/http.dart b/sdk/lib/_http/http.dart
index 6db8524..7fc8b40 100644
--- a/sdk/lib/_http/http.dart
+++ b/sdk/lib/_http/http.dart
@@ -2015,6 +2015,34 @@
   ///
   /// Returns `null` if the socket is not available.
   HttpConnectionInfo? get connectionInfo;
+
+  /// Aborts the client connection.
+  ///
+  /// If the connection has not yet completed, the request is aborted and the
+  /// [done] future (also returned by [close]) is completed with the provided
+  /// [exception] and [stackTrace].
+  /// If [exception] is omitted, it defaults to an [HttpException], and if
+  /// [stackTrace] is omitted, it defaults to [StackTrace.empty].
+  ///
+  /// If the [done] future has already completed, aborting has no effect.
+  ///
+  /// Using the [IOSink] methods (e.g., [write] and [add]) has no effect after
+  /// the request has been aborted
+  ///
+  /// ```dart
+  /// HttpClientRequst request = ...
+  /// request.write();
+  /// Timer(Duration(seconds: 1), () {
+  ///   request.abort();
+  /// });
+  /// request.close().then((response) {
+  ///   // If response comes back before abort, this callback will be called.
+  /// }, onError: (e) {
+  ///   // If abort() called before response is available, onError will fire.
+  /// });
+  /// ```
+  @Since("2.10")
+  void abort([Object? exception, StackTrace? stackTrace]);
 }
 
 /**
diff --git a/sdk/lib/_http/http_impl.dart b/sdk/lib/_http/http_impl.dart
index bc43046..97087ed 100644
--- a/sdk/lib/_http/http_impl.dart
+++ b/sdk/lib/_http/http_impl.dart
@@ -1078,6 +1078,8 @@
 
   List<RedirectInfo> _responseRedirects = [];
 
+  bool _aborted = false;
+
   _HttpClientRequest(_HttpOutgoing outgoing, Uri uri, this.method, this._proxy,
       this._httpClient, this._httpClientConnection, this._timeline)
       : uri = uri,
@@ -1141,7 +1143,10 @@
           .then((list) => list[0]);
 
   Future<HttpClientResponse> close() {
-    super.close();
+    if (!_aborted) {
+      // It will send out the request.
+      super.close();
+    }
     return done;
   }
 
@@ -1161,6 +1166,9 @@
       _httpClientConnection.connectionInfo;
 
   void _onIncoming(_HttpIncoming incoming) {
+    if (_aborted) {
+      return;
+    }
     var response = new _HttpClientResponse(incoming, this, _httpClient);
     Future<HttpClientResponse> future;
     if (followRedirects && response.isRedirect) {
@@ -1183,12 +1191,21 @@
     } else {
       future = new Future<HttpClientResponse>.value(response);
     }
-    future.then((v) => _responseCompleter.complete(v),
-        onError: _responseCompleter.completeError);
+    future.then((v) {
+      if (!_responseCompleter.isCompleted) {
+        _responseCompleter.complete(v);
+      }
+    }, onError: (e, s) {
+      if (!_responseCompleter.isCompleted) {
+        _responseCompleter.completeError(e, s);
+      }
+    });
   }
 
   void _onError(error, StackTrace stackTrace) {
-    _responseCompleter.completeError(error, stackTrace);
+    if (!_responseCompleter.isCompleted) {
+      _responseCompleter.completeError(error, stackTrace);
+    }
   }
 
   // Generate the request URI based on the method and proxy.
@@ -1221,7 +1238,21 @@
     }
   }
 
+  void add(List<int> data) {
+    if (data.length == 0 || _aborted) return;
+    super.add(data);
+  }
+
+  void write(Object? obj) {
+    if (_aborted) return;
+    super.write(obj);
+  }
+
   void _writeHeader() {
+    if (_aborted) {
+      _outgoing.setHeader(Uint8List(0), 0);
+      return;
+    }
     BytesBuilder buffer = new _CopyingBytesBuilder(_OUTGOING_BUFFER_SIZE);
 
     // Write the request method.
@@ -1254,6 +1285,15 @@
     Uint8List headerBytes = buffer.takeBytes();
     _outgoing.setHeader(headerBytes, headerBytes.length);
   }
+
+  void abort([Object? exception, StackTrace? stackTrace]) {
+    _aborted = true;
+    if (!_responseCompleter.isCompleted) {
+      exception ??= HttpException("Request has been aborted");
+      _responseCompleter.completeError(exception, stackTrace);
+      _httpClientConnection.destroy();
+    }
+  }
 }
 
 // Used by _HttpOutgoing as a target of a chunked converter for gzip
diff --git a/tests/standalone/io/http_client_connect_test.dart b/tests/standalone/io/http_client_connect_test.dart
index b82812b..4eb0fff 100644
--- a/tests/standalone/io/http_client_connect_test.dart
+++ b/tests/standalone/io/http_client_connect_test.dart
@@ -308,7 +308,127 @@
   }
 }
 
-void main() {
+Future<void> testHttpAbort() async {
+  // Test that abort() is called after request is sent.
+  asyncStart();
+  final completer = Completer<void>();
+  final server = await HttpServer.bind("127.0.0.1", 0);
+  server.listen((request) {
+    completer.complete();
+    request.response.close();
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  request.headers.add(HttpHeaders.contentLengthHeader, "8");
+  request.write('somedata');
+  completer.future.then((_) {
+    request.abort();
+    asyncStart();
+    Future.delayed(Duration(milliseconds: 500), () {
+      server.close();
+      asyncEnd();
+    });
+  });
+  request.close().then((response) {
+    Expect.fail('abort() prevents a response being returned');
+  }, onError: (e) {
+    Expect.type<HttpException>(e);
+    Expect.isTrue(e.toString().contains('abort'));
+    asyncEnd();
+  });
+}
+
+Future<void> testHttpAbortBeforeWrite() async {
+  // Test that abort() is called before write(). No message should be sent from
+  // HttpClientRequest.
+  asyncStart();
+  final completer = Completer<Socket>();
+  final server = await ServerSocket.bind("127.0.0.1", 0);
+  server.listen((s) async {
+    s.listen((data) {
+      Expect.fail('No message should be received');
+    });
+    await Future.delayed(Duration(milliseconds: 500));
+    completer.complete(s);
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  request.headers.add(HttpHeaders.contentLengthHeader, "8");
+  // This HttpException will go to onError callback.
+  request.abort(HttpException('Error'));
+  asyncStart();
+  request.write('somedata');
+  completer.future.then((socket) {
+    socket.destroy();
+    server.close();
+    asyncEnd();
+  });
+  request.close().then((response) {
+    Expect.fail('abort() prevents a response being returned');
+  }, onError: (e) {
+    Expect.type<HttpException>(e);
+    asyncEnd();
+  });
+}
+
+Future<void> testHttpAbortBeforeClose() async {
+  // Test that abort() is called after write(). Some messages added prior to
+  // abort() are sent.
+  final completer = new Completer<void>();
+  asyncStart();
+  final server = await ServerSocket.bind("127.0.0.1", 0);
+  server.listen((s) {
+    StringBuffer buffer = StringBuffer();
+    s.listen((data) {
+      buffer.write(utf8.decode(data));
+      if (buffer.toString().contains("content-length: 8")) {
+        completer.complete();
+        s.destroy();
+        server.close();
+      }
+    });
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  // Add an additional header field for server to verify.
+  request.headers.add(HttpHeaders.contentLengthHeader, "8");
+  request.write('somedata');
+  await completer.future;
+  final string = 'abort message';
+  request.abort(string);
+  request.close().then((response) {
+    Expect.fail('abort() prevents a response being returned');
+  }, onError: (e) {
+    Expect.type<String>(e);
+    Expect.equals(string, e);
+    asyncEnd();
+  });
+}
+
+Future<void> testHttpAbortAfterClose() async {
+  // Test that abort() is called after response is received. It should not
+  // affect HttpClientResponse.
+  asyncStart();
+  final value = 'someRandomData';
+  final server = await HttpServer.bind("127.0.0.1", 0);
+  server.listen((request) {
+    request.response.write(value);
+    request.response.close();
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  request.close().then((response) {
+    request.abort();
+    response.listen((data) {
+      Expect.equals(utf8.decode(data), value);
+    }, onDone: () {
+      asyncEnd();
+      server.close();
+    });
+  });
+}
+
+void main() async {
   testGetEmptyRequest();
   testGetDataRequest();
   testGetInvalidHost();
@@ -324,4 +444,8 @@
   testMaxConnectionsPerHost(5, 10);
   testMaxConnectionsPerHost(10, 50);
   testMaxConnectionsWithFailure();
+  await testHttpAbort();
+  await testHttpAbortBeforeWrite();
+  await testHttpAbortBeforeClose();
+  await testHttpAbortAfterClose();
 }
diff --git a/tests/standalone_2/io/http_client_connect_test.dart b/tests/standalone_2/io/http_client_connect_test.dart
index 6de515b..46380f1 100644
--- a/tests/standalone_2/io/http_client_connect_test.dart
+++ b/tests/standalone_2/io/http_client_connect_test.dart
@@ -306,7 +306,127 @@
   }
 }
 
-void main() {
+Future<void> testHttpAbort() async {
+  // Test that abort() is called after request is sent.
+  asyncStart();
+  final completer = Completer<void>();
+  final server = await HttpServer.bind("127.0.0.1", 0);
+  server.listen((request) {
+    completer.complete();
+    request.response.close();
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  request.headers.add(HttpHeaders.contentLengthHeader, "8");
+  request.write('somedata');
+  completer.future.then((_) {
+    request.abort();
+    asyncStart();
+    Future.delayed(Duration(milliseconds: 500), () {
+      server.close();
+      asyncEnd();
+    });
+  });
+  request.close().then((response) {
+    Expect.fail('abort() prevents a response being returned');
+  }, onError: (e) {
+    Expect.type<HttpException>(e);
+    Expect.isTrue(e.toString().contains('abort'));
+    asyncEnd();
+  });
+}
+
+Future<void> testHttpAbortBeforeWrite() async {
+  // Test that abort() is called before write(). No message should be sent from
+  // HttpClientRequest.
+  asyncStart();
+  final completer = Completer<Socket>();
+  final server = await ServerSocket.bind("127.0.0.1", 0);
+  server.listen((s) async {
+    s.listen((data) {
+      Expect.fail('No message should be received');
+    });
+    await Future.delayed(Duration(milliseconds: 500));
+    completer.complete(s);
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  request.headers.add(HttpHeaders.contentLengthHeader, "8");
+  // This HttpException will go to onError callback.
+  request.abort(HttpException('Error'));
+  asyncStart();
+  request.write('somedata');
+  completer.future.then((socket) {
+    socket.destroy();
+    server.close();
+    asyncEnd();
+  });
+  request.close().then((response) {
+    Expect.fail('abort() prevents a response being returned');
+  }, onError: (e) {
+    Expect.type<HttpException>(e);
+    asyncEnd();
+  });
+}
+
+Future<void> testHttpAbortBeforeClose() async {
+  // Test that abort() is called after write(). Some messages added prior to
+  // abort() are sent.
+  final completer = new Completer<void>();
+  asyncStart();
+  final server = await ServerSocket.bind("127.0.0.1", 0);
+  server.listen((s) {
+    StringBuffer buffer = StringBuffer();
+    s.listen((data) {
+      buffer.write(utf8.decode(data));
+      if (buffer.toString().contains("content-length: 8")) {
+        completer.complete();
+        s.destroy();
+        server.close();
+      }
+    });
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  // Add an additional header field for server to verify.
+  request.headers.add(HttpHeaders.contentLengthHeader, "8");
+  request.write('somedata');
+  await completer.future;
+  final string = 'abort message';
+  request.abort(string);
+  request.close().then((response) {
+    Expect.fail('abort() prevents a response being returned');
+  }, onError: (e) {
+    Expect.type<String>(e);
+    Expect.equals(string, e);
+    asyncEnd();
+  });
+}
+
+Future<void> testHttpAbortAfterClose() async {
+  // Test that abort() is called after response is received. It should not
+  // affect HttpClientResponse.
+  asyncStart();
+  final value = 'someRandomData';
+  final server = await HttpServer.bind("127.0.0.1", 0);
+  server.listen((request) {
+    request.response.write(value);
+    request.response.close();
+  });
+
+  final request = await HttpClient().get("127.0.0.1", server.port, "/");
+  request.close().then((response) {
+    request.abort();
+    response.listen((data) {
+      Expect.equals(utf8.decode(data), value);
+    }, onDone: () {
+      asyncEnd();
+      server.close();
+    });
+  });
+}
+
+void main() async {
   testGetEmptyRequest();
   testGetDataRequest();
   testGetInvalidHost();
@@ -322,4 +442,8 @@
   testMaxConnectionsPerHost(5, 10);
   testMaxConnectionsPerHost(10, 50);
   testMaxConnectionsWithFailure();
+  await testHttpAbort();
+  await testHttpAbortBeforeWrite();
+  await testHttpAbortBeforeClose();
+  await testHttpAbortAfterClose();
 }