Use a BytesBuilder to group length and message, enable tcpNoDelay.

This makes the socket benchmark (and real world code also), about 100x faster.

Change-Id: Iaa130b7bf69e4c89d3c3a221680e5b62ffe10583
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/296641
Auto-Submit: Jake Macdonald <jakemac@google.com>
Commit-Queue: Jake Macdonald <jakemac@google.com>
Reviewed-by: Bob Nystrom <rnystrom@google.com>
diff --git a/pkg/_fe_analyzer_shared/benchmark/macros/serialization_benchmark.dart b/pkg/_fe_analyzer_shared/benchmark/macros/serialization_benchmark.dart
index 7478b26..6b6d63a 100644
--- a/pkg/_fe_analyzer_shared/benchmark/macros/serialization_benchmark.dart
+++ b/pkg/_fe_analyzer_shared/benchmark/macros/serialization_benchmark.dart
@@ -159,8 +159,10 @@
       responseCompleter = Completer();
       var result = serialize();
       if (result is List<int>) {
-        _writeLength(result, process.stdin);
-        process.stdin.add(result);
+        final bytesBuilder = BytesBuilder(copy: false);
+        _writeLength(result, bytesBuilder);
+        bytesBuilder.add(result);
+        process.stdin.add(bytesBuilder.takeBytes());
       } else {
         process.stdin.writeln(jsonEncode(result));
       }
@@ -172,8 +174,10 @@
       responseCompleter = Completer();
       var result = serialize();
       if (result is List<int>) {
-        _writeLength(result, process.stdin);
-        process.stdin.add(result);
+        final bytesBuilder = BytesBuilder(copy: false);
+        _writeLength(result, bytesBuilder);
+        bytesBuilder.add(result);
+        process.stdin.add(bytesBuilder.takeBytes());
       } else {
         process.stdin.writeln(jsonEncode(result));
       }
@@ -218,6 +222,8 @@
       serverSocket.port.toString(),
     ]);
     var client = await clientCompleter.future;
+    // Nagle's algorithm slows us down >100x, disable it.
+    client.setOption(SocketOption.tcpNoDelay, true);
 
     var listeners = <StreamSubscription>[
       (serializationMode == SerializationMode.jsonServer ||
@@ -240,8 +246,10 @@
       responseCompleter = Completer();
       var result = serialize();
       if (result is List<int>) {
-        _writeLength(result, client);
-        client.add(result);
+        final bytesBuilder = BytesBuilder(copy: false);
+        _writeLength(result, bytesBuilder);
+        bytesBuilder.add(result);
+        client.add(bytesBuilder.takeBytes());
       } else {
         client.write(jsonEncode(result));
       }
@@ -253,8 +261,10 @@
       responseCompleter = Completer();
       var result = serialize();
       if (result is List<int>) {
-        _writeLength(result, client);
-        client.add(result);
+        final bytesBuilder = BytesBuilder(copy: false);
+        _writeLength(result, bytesBuilder);
+        bytesBuilder.add(result);
+        client.add(bytesBuilder.takeBytes());
       } else {
         client.write(jsonEncode(result));
       }
@@ -265,7 +275,7 @@
     listeners.forEach((l) => l.cancel());
     process.kill();
     await serverSocket.close();
-    await client.close();
+    client.destroy();
   } catch (e, s) {
     print('Error running benchmark \n$e\n\n$s');
   } finally {
@@ -273,12 +283,12 @@
   }
 }
 
-void _writeLength(List<int> result, Sink<List<int>> sink) {
+void _writeLength(List<int> result, BytesBuilder bytesBuilder) {
   int length = (result as Uint8List).lengthInBytes;
   if (length > 0xffffffff) {
     throw new StateError('Message was larger than the allowed size!');
   }
-  sink.add([
+  bytesBuilder.add([
     length >> 24 & 0xff,
     length >> 16 & 0xff,
     length >> 8 & 0xff,
@@ -323,8 +333,10 @@
               MessageGrouper(socket).messageStream.listen((data) {
                 deserialize(data);
                 var result = serialize() as Uint8List;
-                _writeLength(result, socket);
-                socket.add(result);
+                final bytesBuilder = BytesBuilder(copy: false);
+                _writeLength(result, bytesBuilder);
+                bytesBuilder.add(result);
+                socket.add(bytesBuilder.takeBytes());
               });
             }
           } else {
@@ -345,8 +357,10 @@
               MessageGrouper(stdin).messageStream.listen((data) {
                 deserialize(data);
                 var result = serialize() as Uint8List;
-                _writeLength(result, stdout);
-                stdout.add(result);
+                final bytesBuilder = BytesBuilder(copy: false);
+                _writeLength(result, bytesBuilder);
+                bytesBuilder.add(result);
+                stdout.add(bytesBuilder.takeBytes());
               });
             }
           }
@@ -392,12 +406,12 @@
         }
       }
 
-      void _writeLength(Uint8List result, Sink<List<int>> sink) {
+      void _writeLength(Uint8List result, BytesBuilder bytesBuilder) {
         int length = result.lengthInBytes;
         if (length > 0xffffffff) {
           throw new StateError('Message was larger than the allowed size!');
         }
-        sink.add([
+        bytesBuilder.add([
           length >> 24 & 0xff,
           length >> 16 & 0xff,
           length >> 8 & 0xff,
diff --git a/pkg/_fe_analyzer_shared/lib/src/macros/bootstrap.dart b/pkg/_fe_analyzer_shared/lib/src/macros/bootstrap.dart
index f6c098c..f143d27 100644
--- a/pkg/_fe_analyzer_shared/lib/src/macros/bootstrap.dart
+++ b/pkg/_fe_analyzer_shared/lib/src/macros/bootstrap.dart
@@ -102,6 +102,8 @@
       late Stream<List<int>> inputStream;
       if (socketAddress != null && socketPort != null) {
         var socket = await Socket.connect(socketAddress, socketPort);
+        // Nagle's algorithm slows us down >100x, disable it.
+        socket.setOption(SocketOption.tcpNoDelay, true);
         sendResult = _sendIOSinkResultFactory(socket);
         inputStream = socket;
       } else {
@@ -380,13 +382,15 @@
       } else if (serializationMode == SerializationMode.byteDataClient) {
         Uint8List result = (serializer as ByteDataSerializer).result;
         int length = result.lengthInBytes;
-        sink.add([
+        final bytesBuilder = BytesBuilder(copy: false);
+        bytesBuilder.add([
           length >> 24 & 0xff,
           length >> 16 & 0xff,
           length >> 8 & 0xff,
           length & 0xff,
         ]);
-        sink.add(result);
+        bytesBuilder.add(result);
+        sink.add(bytesBuilder.takeBytes());
       } else {
         throw new UnsupportedError(
             'Unsupported serialization mode \$serializationMode for '
diff --git a/pkg/_fe_analyzer_shared/lib/src/macros/executor/process_executor.dart b/pkg/_fe_analyzer_shared/lib/src/macros/executor/process_executor.dart
index 22b6ef4..28f66be 100644
--- a/pkg/_fe_analyzer_shared/lib/src/macros/executor/process_executor.dart
+++ b/pkg/_fe_analyzer_shared/lib/src/macros/executor/process_executor.dart
@@ -76,6 +76,8 @@
       clientCompleter.complete(client);
     });
     Socket client = await clientCompleter.future;
+    // Nagle's algorithm slows us down >100x, disable it.
+    client.setOption(SocketOption.tcpNoDelay, true);
 
     Stream<Object> messageStream;
 
@@ -152,13 +154,15 @@
       if (length > 0xffffffff) {
         throw new StateError('Message was larger than the allowed size!');
       }
-      outSink.add([
+      BytesBuilder bytesBuilder = new BytesBuilder(copy: false);
+      bytesBuilder.add([
         length >> 24 & 0xff,
         length >> 16 & 0xff,
         length >> 8 & 0xff,
         length & 0xff
       ]);
-      outSink.add(result);
+      bytesBuilder.add(result);
+      outSink.add(bytesBuilder.takeBytes());
     } else {
       throw new UnsupportedError(
           'Unsupported serialization mode $serializationMode for '
diff --git a/pkg/front_end/test/spell_checking_list_code.txt b/pkg/front_end/test/spell_checking_list_code.txt
index 6cc3b3e..3742c3c 100644
--- a/pkg/front_end/test/spell_checking_list_code.txt
+++ b/pkg/front_end/test/spell_checking_list_code.txt
@@ -938,6 +938,7 @@
 mvar
 n
 na
+nagle's
 nameless
 namer
 natively
@@ -1535,6 +1536,7 @@
 tarjan
 tarjan's
 tb
+tcp
 team
 tearoff
 tearoffable