Add tests for PoolResource and close, and fix allowRelease leak
diff --git a/pkgs/pool/CHANGELOG.md b/pkgs/pool/CHANGELOG.md index 9642601..4f3c31e 100644 --- a/pkgs/pool/CHANGELOG.md +++ b/pkgs/pool/CHANGELOG.md
@@ -1,6 +1,7 @@ ## 1.5.3-wip * Added an example. +* Fix a resource leak when `PoolResource.allowRelease` callback throws. ## 1.5.2
diff --git a/pkgs/pool/lib/pool.dart b/pkgs/pool/lib/pool.dart index 70e9df1..3634649 100644 --- a/pkgs/pool/lib/pool.dart +++ b/pkgs/pool/lib/pool.dart
@@ -27,7 +27,7 @@ /// allocated. /// /// See [PoolResource.allowRelease]. - final _onReleaseCallbacks = Queue<void Function()>(); + final _onReleaseCallbacks = Queue<FutureOr<void> Function()>(); /// Completers that will be completed once `onRelease` callbacks are done /// running. @@ -275,7 +275,7 @@ /// If there are any pending requests, this will fire the oldest one after /// running [onRelease]. - void _onResourceReleaseAllowed(void Function() onRelease) { + void _onResourceReleaseAllowed(FutureOr<void> Function() onRelease) { _resetTimer(); if (_requestedResources.isNotEmpty) { @@ -297,15 +297,17 @@ /// /// Futures returned by [_runOnRelease] always complete in the order they were /// created, even if earlier [onRelease] callbacks take longer to run. - Future<PoolResource> _runOnRelease(void Function() onRelease) { - Future.sync(onRelease).then((value) { - _onReleaseCompleters.removeFirst().complete(PoolResource._(this)); - }).catchError((Object error, StackTrace stackTrace) { - _onReleaseCompleters.removeFirst().completeError(error, stackTrace); - }); - + Future<PoolResource> _runOnRelease(FutureOr<void> Function() onRelease) { var completer = Completer<PoolResource>.sync(); _onReleaseCompleters.add(completer); + + Future.sync(onRelease).then((value) { + _onReleaseCompleters.removeFirst().complete(PoolResource._(this)); + }, onError: (Object error, StackTrace stackTrace) { + _onReleaseCompleters.removeFirst().completeError(error, stackTrace); + _onResourceReleased(); + }); + return completer.future; }
diff --git a/pkgs/pool/test/pool_test.dart b/pkgs/pool/test/pool_test.dart index 23f073d..1086148 100644 --- a/pkgs/pool/test/pool_test.dart +++ b/pkgs/pool/test/pool_test.dart
@@ -277,6 +277,119 @@ await pool.request(); }); + + test('request() throws if allowRelease callback throws', () async { + var pool = Pool(1); + var resource = await pool.request(); + + var requestFuture = pool.request(); + + var completer = Completer<void>(); + resource.allowRelease(() => completer.future); + + await Future<void>.delayed(Duration.zero); + completer.completeError('oh no!'); + + await expectLater(requestFuture, throwsA('oh no!')); + }); + + test('request() does not leak resources when allowRelease throws', + () async { + var pool = Pool(1); + var resource = await pool.request(); + + var requestFuture = pool.request(); + + var completer = Completer<void>(); + resource.allowRelease(() => completer.future); + + await Future<void>.delayed(Duration.zero); + completer.completeError('oh no!'); + + await expectLater(requestFuture, throwsA('oh no!')); + + // Without the fix, this will hang because the slot is leaked. + var nextRequest = pool.request().timeout( + const Duration(milliseconds: 100), + onTimeout: () => throw TimeoutException('Leaked!')); + + await expectLater(nextRequest, completes); + }); + + test('throwing in request listener does not corrupt state', () async { + var pool = Pool(2); + var resource1 = await pool.request(); + var resource2 = await pool.request(); + + var completer1 = Completer<void>(); + resource1.allowRelease(() => completer1.future); + var completer2 = Completer<void>(); + resource2.allowRelease(() => completer2.future); + + var requestFuture1 = pool.request(); + var requestFuture2 = pool.request(); + + var request1Threw = false; + var requestFuture1WithListener = requestFuture1.then((_) { + request1Threw = true; + throw StateError('Listener 1 threw!'); + }); + + expect(requestFuture1WithListener, throwsA(isA<StateError>())); + + var request2Completed = false; + var request2Error = false; + unawaited(requestFuture2.then((_) { + request2Completed = true; + }, onError: (Object e) { + request2Error = true; + })); + + await Future<void>.delayed(Duration.zero); + + completer1.complete(); + await Future<void>.delayed(Duration.zero); + + expect(request1Threw, isTrue); + expect(request2Completed, isFalse); + expect(request2Error, isFalse); + + completer2.complete(); + await Future<void>.delayed(Duration.zero); + + expect(request2Completed, isTrue); + expect(request2Error, isFalse); + }); + }); + + group('PoolResource', () { + test('release() throws StateError if called twice', () async { + var pool = Pool(1); + var resource = await pool.request(); + resource.release(); + expect(resource.release, throwsStateError); + }); + + test('allowRelease() throws StateError if called twice', () async { + var pool = Pool(1); + var resource = await pool.request(); + resource.allowRelease(() {}); + expect(() => resource.allowRelease(() {}), throwsStateError); + }); + + test('allowRelease() throws if called after release()', () async { + var pool = Pool(1); + var resource = await pool.request(); + resource.release(); + expect(() => resource.allowRelease(() {}), throwsStateError); + }); + + test('release() throws if called after allowRelease()', () async { + var pool = Pool(1); + var resource = await pool.request(); + resource.allowRelease(() {}); + expect(resource.release, throwsStateError); + }); }); test("done doesn't complete without close", () async { @@ -296,6 +409,19 @@ expect(() => pool.withResource(() {}), throwsStateError); }); + test('can be called multiple times', () async { + var pool = Pool(1); + var resource = await pool.request(); + + var closeFuture1 = pool.close(); + var closeFuture2 = pool.close(); + + expect(closeFuture1, equals(closeFuture2)); + + resource.release(); + await closeFuture1; + }); + test('pending requests are fulfilled', () async { var pool = Pool(1); var resource1 = await pool.request();