Migrate bazel_worker to null safety (#52)

- Use `WorkResponse` with `exitCode` set to `EXIT_CODE_BROKEN_PIPE` instead of `null` responses when workers exit.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 013f469..74cd91b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 1.0.0-nullsafety.0
+
+* Migrate to null safety.
+* Use `WorkResponse` with `exitCode` set to `EXIT_CODE_BROKEN_PIPE` instead of
+  `null` responses.
+
 ## 0.1.25+1-dev
 
 * Regenerate proto code and fix some new analysis hints.
diff --git a/e2e_test/bin/async_worker.dart b/e2e_test/bin/async_worker.dart
index ddd530c..80d624f 100644
--- a/e2e_test/bin/async_worker.dart
+++ b/e2e_test/bin/async_worker.dart
@@ -9,6 +9,6 @@
 
 /// This worker can run in one of two ways: normally, using stdin/stdout, or
 /// in an isolate, communicating over a [SendPort].
-Future main(List<String> args, [SendPort sendPort]) async {
+Future main(List<String> args, [SendPort? sendPort]) async {
   await ExampleAsyncWorker(sendPort).run();
 }
diff --git a/e2e_test/bin/async_worker_in_isolate.dart b/e2e_test/bin/async_worker_in_isolate.dart
index fce1eff..a94875d 100644
--- a/e2e_test/bin/async_worker_in_isolate.dart
+++ b/e2e_test/bin/async_worker_in_isolate.dart
@@ -14,7 +14,7 @@
 /// Anyone actually using the facility to wrap a worker in an isolate will want
 /// to use this code to do additional work, for example post processing one of
 /// the output files.
-Future main(List<String> args, SendPort message) async {
+Future main(List<String> args, [SendPort? message]) async {
   var receivePort = ReceivePort();
   await Isolate.spawnUri(
       Uri.file('async_worker.dart'), [], receivePort.sendPort);
diff --git a/e2e_test/lib/async_worker.dart b/e2e_test/lib/async_worker.dart
index 075a5e6..192d777 100644
--- a/e2e_test/lib/async_worker.dart
+++ b/e2e_test/lib/async_worker.dart
@@ -11,7 +11,7 @@
 /// separated by newlines.
 class ExampleAsyncWorker extends AsyncWorkerLoop {
   /// Set [sendPort] to run in an isolate.
-  ExampleAsyncWorker([SendPort sendPort])
+  ExampleAsyncWorker([SendPort? sendPort])
       : super(connection: AsyncWorkerConnection(sendPort: sendPort));
 
   @override
diff --git a/e2e_test/pubspec.yaml b/e2e_test/pubspec.yaml
index d9c3577..ad14e87 100644
--- a/e2e_test/pubspec.yaml
+++ b/e2e_test/pubspec.yaml
@@ -3,8 +3,8 @@
   bazel_worker:
     path: ../
 dev_dependencies:
-  cli_util: ^0.1.0
-  path: ^1.4.1
-  test: ^1.0.0
+  cli_util: ^0.3.0-nullsafety
+  path: ^1.8.0-nullsafety
+  test: ^1.16.0-nullsafety
 environment:
-  sdk: '>=2.0.0 <3.0.0'
+  sdk: '>=2.12.0-0 <3.0.0'
diff --git a/e2e_test/test/e2e_test.dart b/e2e_test/test/e2e_test.dart
index faae0d3..9a18070 100644
--- a/e2e_test/test/e2e_test.dart
+++ b/e2e_test/test/e2e_test.dart
@@ -24,7 +24,7 @@
 }
 
 void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
-  BazelWorkerDriver driver;
+  late BazelWorkerDriver driver;
   group(groupName, () {
     setUp(() {
       driver = BazelWorkerDriver(spawnWorker);
@@ -46,7 +46,7 @@
 
 /// Runs [count] work requests through [driver], and asserts that they all
 /// completed with the correct response.
-Future _doRequests(BazelWorkerDriver driver, {int count}) async {
+Future _doRequests(BazelWorkerDriver driver, {int? count}) async {
   count ??= 100;
   var requests = List.generate(count, (requestNum) {
     var request = WorkRequest();
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 310a7f9..b2ac9c3 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -32,9 +32,9 @@
 
   /// Returns the next full message that is received, or null if none are left.
   @override
-  Future<List<int>> get next async {
+  Future<List<int>?> get next async {
     try {
-      List<int> message;
+      List<int>? message;
       while (message == null &&
           (_buffer.isNotEmpty || await _inputQueue.hasNext)) {
         if (_buffer.isEmpty) _buffer.addAll(await _inputQueue.next);
@@ -59,7 +59,7 @@
   Future cancel() {
     if (!_done.isCompleted) {
       _done.complete(null);
-      return _inputQueue.cancel();
+      return _inputQueue.cancel()!;
     }
     return done;
   }
diff --git a/lib/src/constants.dart b/lib/src/constants.dart
index 30113d3..a981638 100644
--- a/lib/src/constants.dart
+++ b/lib/src/constants.dart
@@ -4,3 +4,4 @@
 
 const int EXIT_CODE_OK = 0;
 const int EXIT_CODE_ERROR = 15;
+const int EXIT_CODE_BROKEN_PIPE = 32;
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 8a75977..661a47a 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -45,7 +45,7 @@
   final SpawnWorker _spawnWorker;
 
   BazelWorkerDriver(this._spawnWorker,
-      {int maxIdleWorkers, int maxWorkers, int maxRetries})
+      {int? maxIdleWorkers, int? maxWorkers, int? maxRetries})
       : _maxIdleWorkers = maxIdleWorkers ?? 4,
         _maxWorkers = maxWorkers ?? 4,
         _maxRetries = maxRetries ?? 4;
@@ -57,7 +57,7 @@
   /// to determine when actual work is being done versus just waiting for an
   /// available worker.
   Future<WorkResponse> doWork(WorkRequest request,
-      {Function(Future<WorkResponse>) trackWork}) {
+      {Function(Future<WorkResponse?>)? trackWork}) {
     var attempt = _WorkAttempt(request, trackWork: trackWork);
     _workQueue.add(attempt);
     _runWorkQueue();
@@ -105,7 +105,6 @@
       futureWorker.then((worker) {
         _spawningWorkers.remove(futureWorker);
         _readyWorkers.add(worker);
-
         var connection = StdDriverConnection.forWorker(worker);
         _workerConnections[worker] = connection;
         _runWorker(worker, attempt);
@@ -135,19 +134,19 @@
     var rescheduled = false;
 
     runZonedGuarded(() async {
-      var connection = _workerConnections[worker];
+      var connection = _workerConnections[worker]!;
 
       connection.writeRequest(attempt.request);
       var responseFuture = connection.readResponse();
       if (attempt.trackWork != null) {
-        attempt.trackWork(responseFuture);
+        attempt.trackWork!(responseFuture);
       }
       var response = await responseFuture;
 
       // It is possible for us to complete with an error response due to an
       // unhandled async error before we get here.
       if (!attempt.responseCompleter.isCompleted) {
-        if (response == null) {
+        if (response.exitCode == EXIT_CODE_BROKEN_PIPE) {
           rescheduled = _tryReschedule(attempt);
           if (rescheduled) return;
           stderr.writeln('Failed to run request ${attempt.request}');
@@ -210,7 +209,7 @@
   }
 
   void _killWorker(Process worker) {
-    _workerConnections[worker].cancel();
+    _workerConnections[worker]!.cancel();
     _readyWorkers.remove(worker);
     _idleWorkers.remove(worker);
     worker.kill();
@@ -222,7 +221,7 @@
 class _WorkAttempt {
   final WorkRequest request;
   final responseCompleter = Completer<WorkResponse>();
-  final Function(Future<WorkResponse>) trackWork;
+  final Function(Future<WorkResponse?>)? trackWork;
 
   Future<WorkResponse> get response => responseCompleter.future;
 
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 9e6f4db..9ab915c 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -34,7 +34,7 @@
   Future<void> get done => _messageGrouper.done;
 
   StdDriverConnection(
-      {Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
+      {Stream<List<int>>? inputStream, StreamSink<List<int>>? outputStream})
       : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
         _outputStream = outputStream ?? stdout;
 
@@ -50,7 +50,11 @@
   @override
   Future<WorkResponse> readResponse() async {
     var buffer = await _messageGrouper.next;
-    if (buffer == null) return null;
+    if (buffer == null) {
+      return WorkResponse()
+        ..exitCode = EXIT_CODE_BROKEN_PIPE
+        ..output = 'Connection to worker closed';
+    }
 
     WorkResponse response;
     try {
@@ -104,7 +108,9 @@
   @override
   Future<WorkResponse> readResponse() async {
     if (!await _receivePortIterator.moveNext()) {
-      return null;
+      return WorkResponse()
+        ..exitCode = EXIT_CODE_BROKEN_PIPE
+        ..output = 'Connection to worker closed.';
     }
     return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
   }
diff --git a/lib/src/message_grouper_state.dart b/lib/src/message_grouper_state.dart
index 5cbf933..c8531d7 100644
--- a/lib/src/message_grouper_state.dart
+++ b/lib/src/message_grouper_state.dart
@@ -10,20 +10,16 @@
 /// arrive.
 class MessageGrouperState {
   /// Reads the initial length.
-  _LengthReader _lengthReader;
+  var _lengthReader = _LengthReader();
 
   /// Reads messages from a stream of bytes.
-  _MessageReader _messageReader;
-
-  MessageGrouperState() {
-    reset();
-  }
+  _MessageReader? _messageReader;
 
   /// Handle one byte at a time.
   ///
   /// Returns a [List<int>] of message bytes if [byte] was the last byte in a
   /// message, otherwise returns [null].
-  List<int> handleInput(int byte) {
+  List<int>? handleInput(int byte) {
     if (!_lengthReader.done) {
       _lengthReader.readByte(byte);
       if (_lengthReader.done) {
@@ -31,11 +27,11 @@
       }
     } else {
       assert(_messageReader != null);
-      _messageReader.readByte(byte);
+      _messageReader!.readByte(byte);
     }
 
-    if (_lengthReader.done && _messageReader.done) {
-      var message = _messageReader.message;
+    if (_lengthReader.done && _messageReader!.done) {
+      var message = _messageReader!.message;
       reset();
       return message;
     }
@@ -62,9 +58,8 @@
   bool _done = false;
 
   /// If [_done] is `true`, the decoded value of the length bytes received so
-  /// far (if any).  If [_done] is `false`, the decoded length that was most
-  /// recently received.
-  int _length;
+  /// far (if any), otherwise unitialized.
+  late int _length;
 
   /// The length read in.  You are only allowed to read this if [_done] is
   /// `true`.
diff --git a/lib/src/sync_message_grouper.dart b/lib/src/sync_message_grouper.dart
index c0d11f0..b8f7b36 100644
--- a/lib/src/sync_message_grouper.dart
+++ b/lib/src/sync_message_grouper.dart
@@ -18,9 +18,9 @@
   ///
   /// Returns null at end of file.
   @override
-  List<int> get next {
+  List<int>? get next {
     try {
-      List<int> message;
+      List<int>? message;
       while (message == null) {
         var nextByte = _stdin.readByteSync();
         if (nextByte == -1) return null;
diff --git a/lib/src/worker/async_worker_loop.dart b/lib/src/worker/async_worker_loop.dart
index 9405c4f..ddef5bd 100644
--- a/lib/src/worker/async_worker_loop.dart
+++ b/lib/src/worker/async_worker_loop.dart
@@ -15,7 +15,7 @@
 abstract class AsyncWorkerLoop implements WorkerLoop {
   final AsyncWorkerConnection connection;
 
-  AsyncWorkerLoop({AsyncWorkerConnection connection})
+  AsyncWorkerLoop({AsyncWorkerConnection? connection})
       : connection = connection ?? StdAsyncWorkerConnection();
 
   /// Perform a single [WorkRequest], and return a [WorkResponse].
@@ -27,7 +27,7 @@
   @override
   Future run() async {
     while (true) {
-      WorkResponse response;
+      late WorkResponse response;
       try {
         var request = await connection.readRequest();
         if (request == null) break;
@@ -41,8 +41,6 @@
         if (printMessages.isNotEmpty) {
           response.output = '${response.output}$printMessages';
         }
-        // In case they forget to set this.
-        response.exitCode ??= EXIT_CODE_OK;
       } catch (e, s) {
         response = WorkResponse()
           ..exitCode = EXIT_CODE_ERROR
diff --git a/lib/src/worker/sync_worker_loop.dart b/lib/src/worker/sync_worker_loop.dart
index 2d287b0..406cabb 100644
--- a/lib/src/worker/sync_worker_loop.dart
+++ b/lib/src/worker/sync_worker_loop.dart
@@ -14,7 +14,7 @@
 abstract class SyncWorkerLoop implements WorkerLoop {
   final SyncWorkerConnection connection;
 
-  SyncWorkerLoop({SyncWorkerConnection connection})
+  SyncWorkerLoop({SyncWorkerConnection? connection})
       : connection = connection ?? StdSyncWorkerConnection();
 
   /// Perform a single [WorkRequest], and return a [WorkResponse].
@@ -25,7 +25,7 @@
   @override
   void run() {
     while (true) {
-      WorkResponse response;
+      late WorkResponse response;
       try {
         var request = connection.readRequest();
         if (request == null) break;
@@ -38,8 +38,6 @@
         if (printMessages.isNotEmpty) {
           response.output = '${response.output}$printMessages';
         }
-        // In case they forget to set this.
-        response.exitCode ??= EXIT_CODE_OK;
       } catch (e, s) {
         response = WorkResponse()
           ..exitCode = EXIT_CODE_ERROR
diff --git a/lib/src/worker/worker_connection.dart b/lib/src/worker/worker_connection.dart
index a429cfe..a718099 100644
--- a/lib/src/worker/worker_connection.dart
+++ b/lib/src/worker/worker_connection.dart
@@ -20,7 +20,7 @@
   ///
   /// See [AsyncWorkerConnection] and [SyncWorkerConnection] for more narrow
   /// interfaces.
-  FutureOr<WorkRequest> readRequest();
+  FutureOr<WorkRequest?> readRequest();
 
   void writeResponse(WorkResponse response);
 }
@@ -30,21 +30,21 @@
   /// and [outputStream], unless [sendPort] is specified, in which case
   /// creates a [SendPortAsyncWorkerConnection].
   factory AsyncWorkerConnection(
-          {Stream<List<int>> inputStream,
-          StreamSink<List<int>> outputStream,
-          SendPort sendPort}) =>
+          {Stream<List<int>>? inputStream,
+          StreamSink<List<int>>? outputStream,
+          SendPort? sendPort}) =>
       sendPort == null
           ? StdAsyncWorkerConnection(
               inputStream: inputStream, outputStream: outputStream)
           : SendPortAsyncWorkerConnection(sendPort);
 
   @override
-  Future<WorkRequest> readRequest();
+  Future<WorkRequest?> readRequest();
 }
 
 abstract class SyncWorkerConnection implements WorkerConnection {
   @override
-  WorkRequest readRequest();
+  WorkRequest? readRequest();
 }
 
 /// Default implementation of [AsyncWorkerConnection] that works with [Stdin]
@@ -54,12 +54,12 @@
   final StreamSink<List<int>> _outputStream;
 
   StdAsyncWorkerConnection(
-      {Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
+      {Stream<List<int>>? inputStream, StreamSink<List<int>>? outputStream})
       : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
         _outputStream = outputStream ?? stdout;
 
   @override
-  Future<WorkRequest> readRequest() async {
+  Future<WorkRequest?> readRequest() async {
     var buffer = await _messageGrouper.next;
     if (buffer == null) return null;
 
@@ -88,7 +88,7 @@
       : receivePortIterator = StreamIterator(receivePort.cast());
 
   @override
-  Future<WorkRequest> readRequest() async {
+  Future<WorkRequest?> readRequest() async {
     if (!await receivePortIterator.moveNext()) return null;
     return WorkRequest.fromBuffer(receivePortIterator.current);
   }
@@ -105,12 +105,12 @@
   final SyncMessageGrouper _messageGrouper;
   final Stdout _stdoutStream;
 
-  StdSyncWorkerConnection({Stdin stdinStream, Stdout stdoutStream})
+  StdSyncWorkerConnection({Stdin? stdinStream, Stdout? stdoutStream})
       : _messageGrouper = SyncMessageGrouper(stdinStream ?? stdin),
         _stdoutStream = stdoutStream ?? stdout;
 
   @override
-  WorkRequest readRequest() {
+  WorkRequest? readRequest() {
     var buffer = _messageGrouper.next;
     if (buffer == null) return null;
 
diff --git a/lib/src/worker_protocol.pb.dart b/lib/src/worker_protocol.pb.dart
index fa170c1..3d00908 100644
--- a/lib/src/worker_protocol.pb.dart
+++ b/lib/src/worker_protocol.pb.dart
@@ -2,7 +2,7 @@
 //  Generated code. Do not modify.
 //  source: worker_protocol.proto
 //
-// @dart = 2.3
+// @dart = 2.12
 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
 
 import 'dart:core' as $core;
@@ -47,8 +47,9 @@
   @$core.Deprecated('Using this can add significant overhead to your binary. '
       'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
       'Will be removed in next major version')
-  Input copyWith(void Function(Input) updates) => super.copyWith(
-      (message) => updates(message as Input)); // ignore: deprecated_member_use
+  Input copyWith(void Function(Input) updates) =>
+      super.copyWith((message) => updates(message as Input))
+          as Input; // ignore: deprecated_member_use
   $pb.BuilderInfo get info_ => _i;
   @$core.pragma('dart2js:noInline')
   static Input create() => Input._();
@@ -57,7 +58,7 @@
   @$core.pragma('dart2js:noInline')
   static Input getDefault() =>
       _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<Input>(create);
-  static Input _defaultInstance;
+  static Input? _defaultInstance;
 
   @$pb.TagNumber(1)
   $core.String get path => $_getSZ(0);
@@ -125,8 +126,8 @@
       'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
       'Will be removed in next major version')
   WorkRequest copyWith(void Function(WorkRequest) updates) =>
-      super.copyWith((message) =>
-          updates(message as WorkRequest)); // ignore: deprecated_member_use
+      super.copyWith((message) => updates(message as WorkRequest))
+          as WorkRequest; // ignore: deprecated_member_use
   $pb.BuilderInfo get info_ => _i;
   @$core.pragma('dart2js:noInline')
   static WorkRequest create() => WorkRequest._();
@@ -135,7 +136,7 @@
   @$core.pragma('dart2js:noInline')
   static WorkRequest getDefault() => _defaultInstance ??=
       $pb.GeneratedMessage.$_defaultFor<WorkRequest>(create);
-  static WorkRequest _defaultInstance;
+  static WorkRequest? _defaultInstance;
 
   @$pb.TagNumber(1)
   $core.List<$core.String> get arguments => $_getList(0);
@@ -199,8 +200,8 @@
       'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
       'Will be removed in next major version')
   WorkResponse copyWith(void Function(WorkResponse) updates) =>
-      super.copyWith((message) =>
-          updates(message as WorkResponse)); // ignore: deprecated_member_use
+      super.copyWith((message) => updates(message as WorkResponse))
+          as WorkResponse; // ignore: deprecated_member_use
   $pb.BuilderInfo get info_ => _i;
   @$core.pragma('dart2js:noInline')
   static WorkResponse create() => WorkResponse._();
@@ -210,7 +211,7 @@
   @$core.pragma('dart2js:noInline')
   static WorkResponse getDefault() => _defaultInstance ??=
       $pb.GeneratedMessage.$_defaultFor<WorkResponse>(create);
-  static WorkResponse _defaultInstance;
+  static WorkResponse? _defaultInstance;
 
   @$pb.TagNumber(1)
   $core.int get exitCode => $_getIZ(0);
diff --git a/lib/testing.dart b/lib/testing.dart
index 1227ab2..6d4e07f 100644
--- a/lib/testing.dart
+++ b/lib/testing.dart
@@ -72,8 +72,8 @@
   }
 
   @override
-  StreamSubscription<Uint8List> listen(void Function(Uint8List bytes) onData,
-      {Function onError, void Function() onDone, bool cancelOnError}) {
+  StreamSubscription<Uint8List> listen(void Function(Uint8List bytes)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
     return _controller.stream.listen(onData,
         onError: onError, onDone: onDone, cancelOnError: cancelOnError);
   }
@@ -112,7 +112,7 @@
   void enqueueResponse(WorkResponse response);
 
   /// If set, this message will be printed during the call to `performRequest`.
-  String get printMessage;
+  String? get printMessage;
 }
 
 /// A [StdSyncWorkerConnection] which records its responses.
@@ -137,7 +137,7 @@
   final Queue<WorkResponse> _responses = Queue<WorkResponse>();
 
   @override
-  final String printMessage;
+  final String? printMessage;
 
   TestSyncWorkerLoop(SyncWorkerConnection connection, {this.printMessage})
       : super(connection: connection);
@@ -181,7 +181,7 @@
   final Queue<WorkResponse> _responses = Queue<WorkResponse>();
 
   @override
-  final String printMessage;
+  final String? printMessage;
 
   TestAsyncWorkerLoop(AsyncWorkerConnection connection, {this.printMessage})
       : super(connection: connection);
diff --git a/pubspec.yaml b/pubspec.yaml
index 9db212b..7b549bc 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,16 +1,16 @@
 name: bazel_worker
-version: 0.1.25+1-dev
+version: 1.0.0-nullsafety.0
 
 description: Tools for creating a bazel persistent worker.
 homepage: https://github.com/dart-lang/bazel_worker
 
 environment:
-  sdk: '>=2.8.1 <3.0.0'
+  sdk: '>=2.12.0-0 <3.0.0'
 
 dependencies:
-  async: '>1.9.0 <3.0.0'
-  pedantic: ^1.8.0
-  protobuf: '>=0.14.4 <2.0.0'
+  async: '>=2.5.0-nullsafety <3.0.0'
+  pedantic: ^1.10.0-nullsafety
+  protobuf: '>=2.0.0-nullsafety <3.0.0'
 
 dev_dependencies:
-  test: ^1.2.0
+  test: ^1.16.0-nullsafety
diff --git a/test/driver_connection_test.dart b/test/driver_connection_test.dart
index afb5ae5..3bbb56d 100644
--- a/test/driver_connection_test.dart
+++ b/test/driver_connection_test.dart
@@ -3,9 +3,10 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:isolate';
-import 'package:test/test.dart';
 
+import 'package:bazel_worker/src/constants.dart';
 import 'package:bazel_worker/src/driver/driver_connection.dart';
+import 'package:test/test.dart';
 
 void main() {
   group('IsolateDriverConnection', () {
@@ -17,7 +18,7 @@
 
       isolatePort.close();
 
-      expect(await connection.readResponse(), null);
+      expect((await connection.readResponse()).exitCode, EXIT_CODE_BROKEN_PIPE);
     });
   });
 }
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 56c9086..04af759 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -12,7 +12,11 @@
 import 'package:bazel_worker/driver.dart';
 
 void main() {
-  BazelWorkerDriver driver;
+  BazelWorkerDriver? driver;
+  final disconnectedResponse = WorkResponse()
+    ..exitCode = EXIT_CODE_BROKEN_PIPE
+    ..output = 'Connection closed'
+    ..freeze();
 
   group('basic driver', () {
     test('can run a single request', () async {
@@ -109,8 +113,9 @@
       test('should retry up to maxRetries times', () async {
         createDriver();
         var expectedResponse = WorkResponse();
-        MockWorker.responseQueue.addAll([null, null, expectedResponse]);
-        var actualResponse = await driver.doWork(WorkRequest());
+        MockWorker.responseQueue.addAll(
+            [disconnectedResponse, disconnectedResponse, expectedResponse]);
+        var actualResponse = await driver!.doWork(WorkRequest());
         // The first 2 null responses are thrown away, and we should get the
         // third one.
         expect(actualResponse, expectedResponse);
@@ -121,8 +126,9 @@
 
       test('should fail if it exceeds maxRetries failures', () async {
         createDriver(maxRetries: 2, numBadWorkers: 3);
-        MockWorker.responseQueue.addAll([null, null, WorkResponse()]);
-        var actualResponse = await driver.doWork(WorkRequest());
+        MockWorker.responseQueue.addAll(
+            [disconnectedResponse, disconnectedResponse, WorkResponse()]);
+        var actualResponse = await driver!.doWork(WorkRequest());
         // Should actually get a bad response.
         expect(actualResponse.exitCode, 15);
         expect(
@@ -146,19 +152,17 @@
 /// Runs [count] of fake work requests through [driver], and asserts that they
 /// all completed.
 Future _doRequests(
-    {BazelWorkerDriver driver,
-    int count,
-    Function(Future<WorkResponse>) trackWork}) async {
+    {BazelWorkerDriver? driver,
+    int count = 100,
+    Function(Future<WorkResponse?>)? trackWork}) async {
   // If we create a driver, we need to make sure and terminate it.
   var terminateDriver = driver == null;
   driver ??= BazelWorkerDriver(MockWorker.spawn);
-  count ??= 100;
-  terminateDriver ??= true;
   var requests = List.generate(count, (_) => WorkRequest());
   var responses = List.generate(count, (_) => WorkResponse());
   MockWorker.responseQueue.addAll(responses);
   var actualResponses = await Future.wait(
-      requests.map((request) => driver.doWork(request, trackWork: trackWork)));
+      requests.map((request) => driver!.doWork(request, trackWork: trackWork)));
   expect(actualResponses, unorderedEquals(responses));
   if (terminateDriver) await driver.terminateWorkers();
 }
@@ -169,7 +173,7 @@
 class MockWorkerLoop extends AsyncWorkerLoop {
   final Queue<WorkResponse> _responseQueue;
 
-  MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection connection})
+  MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection? connection})
       : super(connection: connection);
 
   @override
@@ -222,7 +226,7 @@
 
   /// Standard constructor, creates a [WorkerLoop] from [workerLoopFactory] or
   /// a [MockWorkerLoop] if no factory is provided.
-  MockWorker({WorkerLoop Function(MockWorker) workerLoopFactory}) {
+  MockWorker({WorkerLoop Function(MockWorker)? workerLoopFactory}) {
     liveWorkers.add(this);
     var workerLoop = workerLoopFactory != null
         ? workerLoopFactory(this)
@@ -245,12 +249,7 @@
   final _stderrController = StreamController<List<int>>();
 
   @override
-  IOSink get stdin {
-    _stdin ??= IOSink(_stdinController.sink);
-    return _stdin;
-  }
-
-  IOSink _stdin;
+  late final IOSink stdin = IOSink(_stdinController.sink);
   final _stdinController = StreamController<List<int>>();
 
   @override
diff --git a/test/message_grouper_test.dart b/test/message_grouper_test.dart
index aa0570b..861e142 100644
--- a/test/message_grouper_test.dart
+++ b/test/message_grouper_test.dart
@@ -25,9 +25,9 @@
 
 void runTests(TestStdin Function() stdinFactory,
     MessageGrouper Function(Stdin) messageGrouperFactory) {
-  MessageGrouper messageGrouper;
+  late MessageGrouper messageGrouper;
 
-  TestStdin stdinStream;
+  late TestStdin stdinStream;
 
   setUp(() {
     stdinStream = stdinFactory();
diff --git a/test/worker_loop_test.dart b/test/worker_loop_test.dart
index ec7f83b..627a861 100644
--- a/test/worker_loop_test.dart
+++ b/test/worker_loop_test.dart
@@ -52,10 +52,10 @@
     TestStdin Function() stdinFactory,
     T Function(Stdin, Stdout) workerConnectionFactory,
     TestWorkerLoop Function(T) workerLoopFactory) {
-  TestStdin stdinStream;
-  TestStdoutStream stdoutStream;
-  T connection;
-  TestWorkerLoop workerLoop;
+  late TestStdin stdinStream;
+  late TestStdoutStream stdoutStream;
+  late T connection;
+  late TestWorkerLoop workerLoop;
 
   setUp(() {
     stdinStream = stdinFactory();
@@ -86,7 +86,7 @@
     expect(connection.responses, hasLength(1));
     expect(connection.responses[0], response);
     if (workerLoop.printMessage != null) {
-      expect(response.output, endsWith(workerLoop.printMessage),
+      expect(response.output, endsWith(workerLoop.printMessage!),
           reason: 'Print messages should get appended to the response output.');
     }