Cleanup tests
diff --git a/lib/src/common_server.dart b/lib/src/common_server.dart
index 2cfd55d..97ba9dc 100644
--- a/lib/src/common_server.dart
+++ b/lib/src/common_server.dart
@@ -68,16 +68,21 @@
final Random randomSource = Random();
static const int _connectionRetryBaseMs = 250;
static const int _connectionRetryMaxMs = 60000;
- static const Duration _cacheOperationTimeout = Duration(milliseconds: 10000);
+ static const Duration cacheOperationTimeout = Duration(milliseconds: 10000);
RedisCache(this.redisUriString, this.serverVersion) {
_reconnect();
}
- var _connectedOnce = new Completer<void>();
- /// Completes when and if the redis server connects for the first time.
- /// Mostly useful for testing.
- Future get connectedOnce => _connectedOnce.future;
+ var _connected = Completer<void>();
+ /// Completes when and if the redis server connects. This future is reset
+ /// on disconnection. Mostly for testing.
+ Future get connected => _connected.future;
+
+ var _disconnected = Completer<void>()..complete();
+ /// Completes when the server is disconnected (begins completed). This
+ /// future is reset on connection. Mostly for testing.
+ Future get disconnected => _disconnected.future;
String __logPrefix;
String get _logPrefix => __logPrefix ??= 'RedisCache [${redisUriString}] (${serverVersion})';
@@ -94,6 +99,24 @@
redisClient?.disconnect();
}
+ /// Call when an active connection has disconnected.
+ void _resetConnection() {
+ assert(_connected.isCompleted && !_disconnected.isCompleted);
+ _connected = Completer<void>();
+ _connection = null;
+ redisClient = null;
+ _disconnected.complete();
+ }
+
+ /// Call when a new connection is established.
+ void _setUpConnection(redis.Connection newConnection) {
+ assert(_disconnected.isCompleted && !_connected.isCompleted);
+ _disconnected = Completer<void>();
+ _connection = newConnection;
+ redisClient = redis.Client(_connection);
+ _connected.complete();
+ }
+
/// Begin a reconnection loop asynchronously to maintain a connection to the
/// redis server. Never stops trying until shutdown() is called.
void _reconnect([int retryTimeoutMs = _connectionRetryBaseMs]) {
@@ -108,18 +131,14 @@
}
redis.Connection.connect(redisUriString).then((redis.Connection newConnection) {
log.info('${_logPrefix}: Connected to redis server');
- if (!_connectedOnce.isCompleted) _connectedOnce.complete();
- _connection = newConnection;
- redisClient = redis.Client(_connection);
+ _setUpConnection(newConnection);
// If the client disconnects, discard the client and try to connect again.
newConnection.done.then((_) {
- _connection = null;
- redisClient = null;
+ _resetConnection();
log.warning('${_logPrefix}: connection terminated, reconnecting');
_reconnect();
}).catchError((e) {
- _connection = null;
- redisClient = null;
+ _resetConnection();
log.warning('${_logPrefix}: connection terminated with error ${e}, reconnecting');
_reconnect();
});
@@ -145,12 +164,15 @@
log.warning('${_logPrefix}: no cache available when getting key ${key}');
} else {
final commands = redisClient.asCommands<String, String>();
- value = await commands.get(key).timeout(_cacheOperationTimeout, onTimeout: () {
- log.warning('${_logPrefix}: timeout on get operation for key ${key}');
- redisClient.disconnect();
- }).catchError((e) {
- log.warning('${_logPrefix}: error on del operation for key ${key}: ${e}');
- });
+ // commands can return errors synchronously in timeout cases.
+ try {
+ value = await commands.get(key).timeout(cacheOperationTimeout, onTimeout: () {
+ log.warning('${_logPrefix}: timeout on get operation for key ${key}');
+ redisClient?.disconnect();
+ });
+ } catch (e) {
+ log.warning('${_logPrefix}: error on get operation for key ${key}: ${e}');
+ }
}
return value;
}
@@ -159,17 +181,20 @@
Future remove(String key) async {
key = _genKey(key);
if (!_isConnected()) {
- log.warning('${_logPrefix}: no cache available when deleting key ${key}');
+ log.warning('${_logPrefix}: no cache available when removing key ${key}');
return null;
}
final commands = redisClient.asCommands<String, String>();
- return commands.del(key: key).timeout(_cacheOperationTimeout, onTimeout: () {
- log.warning('${_logPrefix}: timeout on del operation for key ${key}');
- redisClient.disconnect();
- }).catchError((e) {
- log.warning('${_logPrefix}: error on del operation for key ${key}: ${e}');
- });
+ // commands can sometimes return errors synchronously in timeout cases.
+ try {
+ return commands.del(key: key).timeout(cacheOperationTimeout, onTimeout: () {
+ log.warning('${_logPrefix}: timeout on remove operation for key ${key}');
+ redisClient?.disconnect();
+ });
+ } catch (e) {
+ log.warning('${_logPrefix}: error on remove operation for key ${key}: ${e}');
+ }
}
@override
@@ -181,19 +206,22 @@
}
final commands = redisClient.asCommands<String, String>();
- return Future.sync(() async {
- await commands.multi();
- unawaited(commands.set(key, value));
- if (expiration != null) {
- unawaited(commands.pexpire(key, expiration.inMilliseconds));
- }
- await commands.exec();
- }).timeout(_cacheOperationTimeout, onTimeout: () {
- log.warning('${_logPrefix}: timeout on set operation for key ${key}');
- redisClient.disconnect();
- }).catchError((e) {
+ // commands can sometimes return errors synchronously in timeout cases.
+ try {
+ return Future.sync(() async {
+ await commands.multi();
+ unawaited(commands.set(key, value));
+ if (expiration != null) {
+ unawaited(commands.pexpire(key, expiration.inMilliseconds));
+ }
+ await commands.exec();
+ }).timeout(cacheOperationTimeout, onTimeout: () {
+ log.warning('${_logPrefix}: timeout on set operation for key ${key}');
+ redisClient?.disconnect();
+ });
+ } catch (e) {
log.warning('${_logPrefix}: error on set operation for key ${key}: ${e}');
- });
+ }
}
}
diff --git a/test/common_server_test.dart b/test/common_server_test.dart
index 179da9e..86e3657 100644
--- a/test/common_server_test.dart
+++ b/test/common_server_test.dart
@@ -73,11 +73,16 @@
}
/// Integration tests for the RedisCache implementation.
+ ///
+ /// We basically assume that redis and dartis work correctly -- this is
+ /// exercising the connection maintenance and exception handling.
group('RedisCache', () {
// Note: all caches share values between them.
RedisCache redisCache, redisCacheAlt;
Process redisProcess, redisAltProcess;
List<String> logMessages = [];
+ // Critical section handling -- do not run more than one test at a time
+ // since they talk to the same redis instances.
Lock singleTestOnly = Lock();
setUpAll(() async {
@@ -90,7 +95,7 @@
});
redisCache = RedisCache('redis://localhost:9501', 'aversion');
redisCacheAlt = RedisCache('redis://localhost:9501', 'bversion');
- await Future.wait([redisCache.connectedOnce, redisCacheAlt.connectedOnce]);
+ await Future.wait([redisCache.connected, redisCacheAlt.connected]);
});
tearDown(() async {
@@ -115,6 +120,8 @@
await expectLater(await redisCache.get('unknownkey'), isNull);
await redisCache.set('unknownkey', 'value');
await expectLater(await redisCache.get('unknownkey'), equals('value'));
+ await redisCache.remove('unknownkey');
+ await expectLater(await redisCache.get('unknownkey'), isNull);
expect(logMessages, isEmpty);
});
});
@@ -147,9 +154,11 @@
try {
await redisCacheBroken.set('aKey', 'value');
await expectLater(await redisCacheBroken.get('aKey'), isNull);
+ await redisCacheBroken.remove('aKey');
expect(logMessages.join('\n'), stringContainsInOrder([
'no cache available when setting key cversion+aKey',
'no cache available when getting key cversion+aKey',
+ 'no cache available when removing key cversion+aKey',
]));
} finally {
redisCacheBroken.shutdown();
@@ -172,10 +181,10 @@
]));
// Start a redis server.
- redisProcess = await Process.start('redis-server', ['--port', '9503']);
+ redisAltProcess = await Process.start('redis-server', ['--port', '9503']);
// Wait for connection.
- await redisCacheRepairable.connectedOnce;
+ await redisCacheRepairable.connected;
expect(logMessages.join('\n'), contains('Connected to redis server'));
} finally {
redisCacheRepairable.shutdown();
@@ -183,12 +192,32 @@
});
});
+ test('Verify that cache that stops responding temporarily times out and can recover', () async {
+ await singleTestOnly.synchronized(() async {
+ logMessages = [];
+ await redisCache.set('beforeStop', 'truth');
+ redisProcess.kill(ProcessSignal.sigstop);
+ // Don't fail the test before sending sigcont.
+ var beforeStop = await redisCache.get('beforeStop');
+ await redisCache.disconnected;
+ redisProcess.kill(ProcessSignal.sigcont);
+ expect(beforeStop, isNull);
+ await redisCache.connected;
+ await expectLater(await redisCache.get('beforeStop'), equals('truth'));
+ expect(logMessages.join('\n'), stringContainsInOrder([
+ 'timeout on get operation for key aversion+beforeStop',
+ '(aversion): reconnecting',
+ '(aversion): Connected to redis server',
+ ]));
+ });
+ }, onPlatform: {'windows': Skip('Windows does not have sigstop/sigcont')});
+
test('Verify cache that starts out connected but breaks retries until reconnection (slow)', () async {
await singleTestOnly.synchronized(() async {
logMessages = [];
redisAltProcess = await Process.start('redis-server', ['--port', '9504']);
RedisCache redisCacheHealing = RedisCache('redis://localhost:9504', 'cversion');
- await redisCacheHealing.connectedOnce;
+ await redisCacheHealing.connected;
await redisCacheHealing.set('missingKey', 'value');
@@ -197,24 +226,20 @@
await redisAltProcess.exitCode;
redisAltProcess = null;
- // Try to talk to the cache and get an error.
+ // Try to talk to the cache and get an error. Wait for the disconnect
+ // to be recognized.
await expectLater(await redisCacheHealing.get('missingKey'), isNull);
+ await redisCacheHealing.disconnected;
- while (logMessages.length < 7) {
- await Future.delayed(Duration(milliseconds: 50));
- }
-
+ // Start the server and verify we connect appropriately.
+ redisAltProcess = await Process.start('redis-server', ['--port', '9504']);
+ await redisCacheHealing.connected;
expect(logMessages.join('\n'), stringContainsInOrder([
'Connected to redis server',
'connection terminated with error SocketException',
'reconnecting to redis://localhost:9504',
]));
-
- redisAltProcess = await Process.start('redis-server', ['--port', '9504']);
-
- while (!logMessages.last.contains('Connected to redis server')) {
- await Future.delayed(Duration(milliseconds: 50));
- }
+ expect(logMessages.last, contains('Connected to redis server'));
});
});
});