Add tests for PoolResource and close, and fix allowRelease leak
diff --git a/pkgs/pool/CHANGELOG.md b/pkgs/pool/CHANGELOG.md
index 9642601..4f3c31e 100644
--- a/pkgs/pool/CHANGELOG.md
+++ b/pkgs/pool/CHANGELOG.md
@@ -1,6 +1,7 @@
 ## 1.5.3-wip
 
 * Added an example.
+* Fix a resource leak when `PoolResource.allowRelease` callback throws.
 
 ## 1.5.2
 
diff --git a/pkgs/pool/lib/pool.dart b/pkgs/pool/lib/pool.dart
index 70e9df1..3634649 100644
--- a/pkgs/pool/lib/pool.dart
+++ b/pkgs/pool/lib/pool.dart
@@ -27,7 +27,7 @@
   /// allocated.
   ///
   /// See [PoolResource.allowRelease].
-  final _onReleaseCallbacks = Queue<void Function()>();
+  final _onReleaseCallbacks = Queue<FutureOr<void> Function()>();
 
   /// Completers that will be completed once `onRelease` callbacks are done
   /// running.
@@ -275,7 +275,7 @@
 
   /// If there are any pending requests, this will fire the oldest one after
   /// running [onRelease].
-  void _onResourceReleaseAllowed(void Function() onRelease) {
+  void _onResourceReleaseAllowed(FutureOr<void> Function() onRelease) {
     _resetTimer();
 
     if (_requestedResources.isNotEmpty) {
@@ -297,15 +297,17 @@
   ///
   /// Futures returned by [_runOnRelease] always complete in the order they were
   /// created, even if earlier [onRelease] callbacks take longer to run.
-  Future<PoolResource> _runOnRelease(void Function() onRelease) {
-    Future.sync(onRelease).then((value) {
-      _onReleaseCompleters.removeFirst().complete(PoolResource._(this));
-    }).catchError((Object error, StackTrace stackTrace) {
-      _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
-    });
-
+  Future<PoolResource> _runOnRelease(FutureOr<void> Function() onRelease) {
     var completer = Completer<PoolResource>.sync();
     _onReleaseCompleters.add(completer);
+
+    Future.sync(onRelease).then((value) {
+      _onReleaseCompleters.removeFirst().complete(PoolResource._(this));
+    }, onError: (Object error, StackTrace stackTrace) {
+      _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
+      _onResourceReleased();
+    });
+
     return completer.future;
   }
 
diff --git a/pkgs/pool/test/pool_test.dart b/pkgs/pool/test/pool_test.dart
index 23f073d..1086148 100644
--- a/pkgs/pool/test/pool_test.dart
+++ b/pkgs/pool/test/pool_test.dart
@@ -277,6 +277,119 @@
 
       await pool.request();
     });
+
+    test('request() throws if allowRelease callback throws', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var requestFuture = pool.request();
+
+      var completer = Completer<void>();
+      resource.allowRelease(() => completer.future);
+
+      await Future<void>.delayed(Duration.zero);
+      completer.completeError('oh no!');
+
+      await expectLater(requestFuture, throwsA('oh no!'));
+    });
+
+    test('request() does not leak resources when allowRelease throws',
+        () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var requestFuture = pool.request();
+
+      var completer = Completer<void>();
+      resource.allowRelease(() => completer.future);
+
+      await Future<void>.delayed(Duration.zero);
+      completer.completeError('oh no!');
+
+      await expectLater(requestFuture, throwsA('oh no!'));
+
+      // Without the fix, this will hang because the slot is leaked.
+      var nextRequest = pool.request().timeout(
+          const Duration(milliseconds: 100),
+          onTimeout: () => throw TimeoutException('Leaked!'));
+
+      await expectLater(nextRequest, completes);
+    });
+
+    test('throwing in request listener does not corrupt state', () async {
+      var pool = Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+
+      var completer1 = Completer<void>();
+      resource1.allowRelease(() => completer1.future);
+      var completer2 = Completer<void>();
+      resource2.allowRelease(() => completer2.future);
+
+      var requestFuture1 = pool.request();
+      var requestFuture2 = pool.request();
+
+      var request1Threw = false;
+      var requestFuture1WithListener = requestFuture1.then((_) {
+        request1Threw = true;
+        throw StateError('Listener 1 threw!');
+      });
+
+      expect(requestFuture1WithListener, throwsA(isA<StateError>()));
+
+      var request2Completed = false;
+      var request2Error = false;
+      unawaited(requestFuture2.then((_) {
+        request2Completed = true;
+      }, onError: (Object e) {
+        request2Error = true;
+      }));
+
+      await Future<void>.delayed(Duration.zero);
+
+      completer1.complete();
+      await Future<void>.delayed(Duration.zero);
+
+      expect(request1Threw, isTrue);
+      expect(request2Completed, isFalse);
+      expect(request2Error, isFalse);
+
+      completer2.complete();
+      await Future<void>.delayed(Duration.zero);
+
+      expect(request2Completed, isTrue);
+      expect(request2Error, isFalse);
+    });
+  });
+
+  group('PoolResource', () {
+    test('release() throws StateError if called twice', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+      resource.release();
+      expect(resource.release, throwsStateError);
+    });
+
+    test('allowRelease() throws StateError if called twice', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+      resource.allowRelease(() {});
+      expect(() => resource.allowRelease(() {}), throwsStateError);
+    });
+
+    test('allowRelease() throws if called after release()', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+      resource.release();
+      expect(() => resource.allowRelease(() {}), throwsStateError);
+    });
+
+    test('release() throws if called after allowRelease()', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+      resource.allowRelease(() {});
+      expect(resource.release, throwsStateError);
+    });
   });
 
   test("done doesn't complete without close", () async {
@@ -296,6 +409,19 @@
       expect(() => pool.withResource(() {}), throwsStateError);
     });
 
+    test('can be called multiple times', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var closeFuture1 = pool.close();
+      var closeFuture2 = pool.close();
+
+      expect(closeFuture1, equals(closeFuture2));
+
+      resource.release();
+      await closeFuture1;
+    });
+
     test('pending requests are fulfilled', () async {
       var pool = Pool(1);
       var resource1 = await pool.request();