Cleanup tests
diff --git a/lib/src/common_server.dart b/lib/src/common_server.dart
index 2cfd55d..97ba9dc 100644
--- a/lib/src/common_server.dart
+++ b/lib/src/common_server.dart
@@ -68,16 +68,21 @@
   final Random randomSource = Random();
   static const int _connectionRetryBaseMs = 250;
   static const int _connectionRetryMaxMs = 60000;
-  static const Duration _cacheOperationTimeout = Duration(milliseconds: 10000);
+  static const Duration cacheOperationTimeout = Duration(milliseconds: 10000);
 
   RedisCache(this.redisUriString, this.serverVersion) {
     _reconnect();
   }
 
-  var _connectedOnce = new Completer<void>();
-  /// Completes when and if the redis server connects for the first time.
-  /// Mostly useful for testing.
-  Future get connectedOnce => _connectedOnce.future;
+  var _connected = Completer<void>();
+  /// Completes when and if the redis server connects.  This future is reset
+  /// on disconnection.  Mostly for testing.
+  Future get connected => _connected.future;
+
+  var _disconnected = Completer<void>()..complete();
+  /// Completes when the server is disconnected (begins completed).  This
+  /// future is reset on connection.  Mostly for testing.
+  Future get disconnected => _disconnected.future;
 
   String __logPrefix;
   String get _logPrefix => __logPrefix ??= 'RedisCache [${redisUriString}] (${serverVersion})';
@@ -94,6 +99,24 @@
     redisClient?.disconnect();
   }
 
+  /// Call when an active connection has disconnected.
+  void _resetConnection() {
+    assert(_connected.isCompleted && !_disconnected.isCompleted);
+    _connected = Completer<void>();
+    _connection = null;
+    redisClient = null;
+    _disconnected.complete();
+  }
+
+  /// Call when a new connection is established.
+  void _setUpConnection(redis.Connection newConnection) {
+    assert(_disconnected.isCompleted && !_connected.isCompleted);
+    _disconnected = Completer<void>();
+    _connection = newConnection;
+    redisClient = redis.Client(_connection);
+    _connected.complete();
+  }
+
   /// Begin a reconnection loop asynchronously to maintain a connection to the
   /// redis server.  Never stops trying until shutdown() is called.
   void _reconnect([int retryTimeoutMs = _connectionRetryBaseMs])  {
@@ -108,18 +131,14 @@
     }
     redis.Connection.connect(redisUriString).then((redis.Connection newConnection) {
       log.info('${_logPrefix}: Connected to redis server');
-      if (!_connectedOnce.isCompleted) _connectedOnce.complete();
-      _connection = newConnection;
-      redisClient = redis.Client(_connection);
+      _setUpConnection(newConnection);
       // If the client disconnects, discard the client and try to connect again.
       newConnection.done.then((_) {
-        _connection = null;
-        redisClient = null;
+        _resetConnection();
         log.warning('${_logPrefix}: connection terminated, reconnecting');
         _reconnect();
       }).catchError((e) {
-        _connection = null;
-        redisClient = null;
+        _resetConnection();
         log.warning('${_logPrefix}: connection terminated with error ${e}, reconnecting');
         _reconnect();
       });
@@ -145,12 +164,15 @@
       log.warning('${_logPrefix}: no cache available when getting key ${key}');
     } else {
       final commands = redisClient.asCommands<String, String>();
-      value = await commands.get(key).timeout(_cacheOperationTimeout, onTimeout: () {
-        log.warning('${_logPrefix}: timeout on get operation for key ${key}');
-        redisClient.disconnect();
-      }).catchError((e) {
-        log.warning('${_logPrefix}: error on del operation for key ${key}: ${e}');
-      });
+      // commands can return errors synchronously in timeout cases.
+      try {
+        value = await commands.get(key).timeout(cacheOperationTimeout, onTimeout: () {
+          log.warning('${_logPrefix}: timeout on get operation for key ${key}');
+          redisClient?.disconnect();
+        });
+      } catch (e) {
+        log.warning('${_logPrefix}: error on get operation for key ${key}: ${e}');
+      }
     }
     return value;
   }
@@ -159,17 +181,20 @@
   Future remove(String key) async {
     key = _genKey(key);
     if (!_isConnected()) {
-      log.warning('${_logPrefix}: no cache available when deleting key ${key}');
+      log.warning('${_logPrefix}: no cache available when removing key ${key}');
       return null;
     }
 
     final commands = redisClient.asCommands<String, String>();
-    return commands.del(key: key).timeout(_cacheOperationTimeout, onTimeout: () {
-      log.warning('${_logPrefix}: timeout on del operation for key ${key}');
-      redisClient.disconnect();
-    }).catchError((e) {
-      log.warning('${_logPrefix}: error on del operation for key ${key}: ${e}');
-    });
+    // commands can sometimes return errors synchronously in timeout cases.
+    try {
+      return commands.del(key: key).timeout(cacheOperationTimeout, onTimeout: () {
+        log.warning('${_logPrefix}: timeout on remove operation for key ${key}');
+        redisClient?.disconnect();
+      });
+    } catch (e) {
+      log.warning('${_logPrefix}: error on remove operation for key ${key}: ${e}');
+    }
   }
 
   @override
@@ -181,19 +206,22 @@
     }
 
     final commands = redisClient.asCommands<String, String>();
-    return Future.sync(() async {
-      await commands.multi();
-      unawaited(commands.set(key, value));
-      if (expiration != null) {
-        unawaited(commands.pexpire(key, expiration.inMilliseconds));
-      }
-      await commands.exec();
-    }).timeout(_cacheOperationTimeout, onTimeout: () {
-      log.warning('${_logPrefix}: timeout on set operation for key ${key}');
-      redisClient.disconnect();
-    }).catchError((e) {
+    // commands can sometimes return errors synchronously in timeout cases.
+    try {
+      return Future.sync(() async {
+        await commands.multi();
+        unawaited(commands.set(key, value));
+        if (expiration != null) {
+          unawaited(commands.pexpire(key, expiration.inMilliseconds));
+        }
+        await commands.exec();
+      }).timeout(cacheOperationTimeout, onTimeout: () {
+        log.warning('${_logPrefix}: timeout on set operation for key ${key}');
+        redisClient?.disconnect();
+      });
+    } catch (e) {
       log.warning('${_logPrefix}: error on set operation for key ${key}: ${e}');
-    });
+    }
   }
 }
 
diff --git a/test/common_server_test.dart b/test/common_server_test.dart
index 179da9e..86e3657 100644
--- a/test/common_server_test.dart
+++ b/test/common_server_test.dart
@@ -73,11 +73,16 @@
   }
 
   /// Integration tests for the RedisCache implementation.
+  ///
+  /// We basically assume that redis and dartis work correctly -- this is
+  /// exercising the connection maintenance and exception handling.
   group('RedisCache', () {
     // Note: all caches share values between them.
     RedisCache redisCache, redisCacheAlt;
     Process redisProcess, redisAltProcess;
     List<String> logMessages = [];
+    // Critical section handling -- do not run more than one test at a time
+    // since they talk to the same redis instances.
     Lock singleTestOnly = Lock();
 
     setUpAll(() async {
@@ -90,7 +95,7 @@
       });
       redisCache = RedisCache('redis://localhost:9501', 'aversion');
       redisCacheAlt = RedisCache('redis://localhost:9501', 'bversion');
-      await Future.wait([redisCache.connectedOnce, redisCacheAlt.connectedOnce]);
+      await Future.wait([redisCache.connected, redisCacheAlt.connected]);
     });
 
     tearDown(() async {
@@ -115,6 +120,8 @@
         await expectLater(await redisCache.get('unknownkey'), isNull);
         await redisCache.set('unknownkey', 'value');
         await expectLater(await redisCache.get('unknownkey'), equals('value'));
+        await redisCache.remove('unknownkey');
+        await expectLater(await redisCache.get('unknownkey'), isNull);
         expect(logMessages, isEmpty);
       });
     });
@@ -147,9 +154,11 @@
         try {
           await redisCacheBroken.set('aKey', 'value');
           await expectLater(await redisCacheBroken.get('aKey'), isNull);
+          await redisCacheBroken.remove('aKey');
           expect(logMessages.join('\n'), stringContainsInOrder([
             'no cache available when setting key cversion+aKey',
             'no cache available when getting key cversion+aKey',
+            'no cache available when removing key cversion+aKey',
           ]));
         } finally {
           redisCacheBroken.shutdown();
@@ -172,10 +181,10 @@
           ]));
 
           // Start a redis server.
-          redisProcess = await Process.start('redis-server', ['--port', '9503']);
+          redisAltProcess = await Process.start('redis-server', ['--port', '9503']);
 
           // Wait for connection.
-          await redisCacheRepairable.connectedOnce;
+          await redisCacheRepairable.connected;
           expect(logMessages.join('\n'), contains('Connected to redis server'));
         } finally {
           redisCacheRepairable.shutdown();
@@ -183,12 +192,32 @@
       });
     });
 
+    test('Verify that cache that stops responding temporarily times out and can recover', () async {
+      await singleTestOnly.synchronized(() async {
+        logMessages = [];
+        await redisCache.set('beforeStop', 'truth');
+        redisProcess.kill(ProcessSignal.sigstop);
+        // Don't fail the test before sending sigcont.
+        var beforeStop = await redisCache.get('beforeStop');
+        await redisCache.disconnected;
+        redisProcess.kill(ProcessSignal.sigcont);
+        expect(beforeStop, isNull);
+        await redisCache.connected;
+        await expectLater(await redisCache.get('beforeStop'), equals('truth'));
+        expect(logMessages.join('\n'), stringContainsInOrder([
+          'timeout on get operation for key aversion+beforeStop',
+          '(aversion): reconnecting',
+          '(aversion): Connected to redis server',
+        ]));
+      });
+    }, onPlatform: {'windows': Skip('Windows does not have sigstop/sigcont')});
+
     test('Verify cache that starts out connected but breaks retries until reconnection (slow)', () async {
       await singleTestOnly.synchronized(() async {
         logMessages = [];
         redisAltProcess = await Process.start('redis-server', ['--port', '9504']);
         RedisCache redisCacheHealing = RedisCache('redis://localhost:9504', 'cversion');
-        await redisCacheHealing.connectedOnce;
+        await redisCacheHealing.connected;
 
         await redisCacheHealing.set('missingKey', 'value');
 
@@ -197,24 +226,20 @@
         await redisAltProcess.exitCode;
         redisAltProcess = null;
 
-        // Try to talk to the cache and get an error.
+        // Try to talk to the cache and get an error.  Wait for the disconnect
+        // to be recognized.
         await expectLater(await redisCacheHealing.get('missingKey'), isNull);
+        await redisCacheHealing.disconnected;
 
-        while (logMessages.length < 7) {
-          await Future.delayed(Duration(milliseconds: 50));
-        }
-
+        // Start the server and verify we connect appropriately.
+        redisAltProcess = await Process.start('redis-server', ['--port', '9504']);
+        await redisCacheHealing.connected;
         expect(logMessages.join('\n'), stringContainsInOrder([
           'Connected to redis server',
           'connection terminated with error SocketException',
           'reconnecting to redis://localhost:9504',
         ]));
-
-        redisAltProcess = await Process.start('redis-server', ['--port', '9504']);
-
-        while (!logMessages.last.contains('Connected to redis server')) {
-          await Future.delayed(Duration(milliseconds: 50));
-        }
+        expect(logMessages.last, contains('Connected to redis server'));
       });
     });
   });