Skip to content

Commit 45c61bb

Browse files
kevinmichaelbowersoxKevin BowersoxCopilotvazois
authored
Improve Failover Restore / Replication Handling (#1670)
* Improve Failover Restore / Replication Handling * Update libs/common/ExceptionInjectionType.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply PR Feedback --------- Co-authored-by: Kevin Bowersox <kbowersox@microsoft.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Vasileios Zois <96085550+vazois@users.noreply.github.com>
1 parent 144477c commit 45c61bb

5 files changed

Lines changed: 263 additions & 4 deletions

File tree

libs/cluster/Server/Failover/FailoverManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ internal sealed class FailoverManager : IDisposable
1616
readonly TimeSpan clusterTimeout;
1717
readonly ILogger logger;
1818
private SingleWriterMultiReaderLock failoverTaskLock;
19-
public FailoverStatus lastFailoverStatus = FailoverStatus.NO_FAILOVER;
19+
public volatile FailoverStatus lastFailoverStatus = FailoverStatus.NO_FAILOVER;
2020

2121
/// <summary>
2222
/// Shared epoch instance for failover GarnetClient connections

libs/cluster/Server/Failover/ReplicaFailoverSession.cs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ private bool TakeOverAsPrimary()
119119

120120
try
121121
{
122+
#if DEBUG
123+
// Exception injection point for testing: simulates TakeOverAsPrimary failure
124+
// after PauseWritesAndWaitForSync has already sent failstopwrites to the primary.
125+
ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.Failover_Fail_TakeOverAsPrimary);
126+
#endif
122127
// Make replica syncing unavailable by setting recovery flag
123128
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterFailover, upgradeLock: false))
124129
{
@@ -272,6 +277,14 @@ private async Task IssueAttachReplicas()
272277
}
273278
}
274279

280+
/// <summary>
281+
/// Returns true if failstopwrites was confirmed by the primary and the primary's
282+
/// config was modified (slots given up, role changed to replica). Used to determine
283+
/// whether the primary needs to be reset on failover failure.
284+
/// </summary>
285+
private bool PrimaryNeedsReset()
286+
=> status is FailoverStatus.WAITING_FOR_SYNC or FailoverStatus.TAKING_OVER_AS_PRIMARY;
287+
275288
/// <summary>
276289
/// REPLICA main failover task
277290
/// </summary>
@@ -281,6 +294,7 @@ public async Task<bool> BeginAsyncReplicaFailover()
281294
// CLUSTER FAILOVER OPTIONS
282295
// FORCE: Do not await for the primary since it might be unreachable
283296
// TAKEOVER: Same as force but also do not await for voting from other primaries
297+
var failoverSucceeded = false;
284298
try
285299
{
286300
// Issue stop writes and on ack wait for replica to catch up
@@ -298,11 +312,9 @@ public async Task<bool> BeginAsyncReplicaFailover()
298312
// Transition to primary role
299313
if (!TakeOverAsPrimary())
300314
{
301-
// Request primary to be reset to original state only if DEFAULT option was used
302-
if (primaryClient != null)
303-
_ = await primaryClient?.failstopwrites(Array.Empty<byte>()).WaitAsync(failoverTimeout, cts.Token);
304315
return false;
305316
}
317+
failoverSucceeded = true;
306318

307319
// Attach to old replicas, and old primary if DEFAULT option
308320
await IssueAttachReplicas();
@@ -319,6 +331,24 @@ public async Task<bool> BeginAsyncReplicaFailover()
319331
}
320332
finally
321333
{
334+
// If failstopwrites was confirmed by the primary (status reached WAITING_FOR_SYNC
335+
// or beyond) but the failover did not succeed, reset the primary back to its
336+
// original state. Without this, the primary has already given up its slots
337+
// (via TryStopWrites) but the replica never claimed them, leaving the cluster
338+
// in an incoherent state where no node owns the slots.
339+
if (PrimaryNeedsReset() && !failoverSucceeded)
340+
{
341+
try
342+
{
343+
logger?.LogWarning("Attempting to reset primary after failed failover");
344+
_ = await primaryClient?.failstopwrites(Array.Empty<byte>()).WaitAsync(failoverTimeout, cts.Token);
345+
}
346+
catch (Exception ex)
347+
{
348+
logger?.LogError(ex, "Failed to reset primary after failed failover — cluster may be in an incoherent state");
349+
}
350+
}
351+
322352
primaryClient?.Dispose();
323353
status = FailoverStatus.NO_FAILOVER;
324354
}

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,20 @@ public void EnsureReplication(ClusterSession activeSession, IEnumerable<ICluster
213213
return;
214214
}
215215

216+
// Suppress auto-resync while a failover is in progress.
217+
// Without this guard, EnsureReplication would acquire a ReadRole lock that blocks
218+
// TakeOverAsPrimary from obtaining its ClusterFailover write lock, aborting the failover.
219+
var failoverStatus = clusterProvider.failoverManager.lastFailoverStatus;
220+
if (failoverStatus is FailoverStatus.BEGIN_FAILOVER
221+
or FailoverStatus.ISSUING_PAUSE_WRITES
222+
or FailoverStatus.WAITING_FOR_SYNC
223+
or FailoverStatus.FAILOVER_IN_PROGRESS
224+
or FailoverStatus.TAKING_OVER_AS_PRIMARY)
225+
{
226+
logger?.LogDebug("Suppressing auto-resync during active failover (status: {failoverStatus})", failoverStatus);
227+
return;
228+
}
229+
216230
// Now we're going to attempt to re-establish replication
217231

218232
// To avoid a TOCTOU issue, we need to prevent role change while we do this

libs/common/ExceptionInjectionType.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,9 @@ public enum ExceptionInjectionType
7777
/// During deletion of a Vector Set, leaving it partially deleted - at a particular point of execution.
7878
/// </summary>
7979
VectorSet_Interrupt_Delete_2,
80+
/// <summary>
81+
/// Fail TakeOverAsPrimary during failover by throwing before BeginRecovery is called.
82+
/// </summary>
83+
Failover_Fail_TakeOverAsPrimary,
8084
}
8185
}

test/Garnet.test.cluster/ClusterNegativeTests.cs

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,5 +626,216 @@ public void ClusterReplicateFails()
626626
var exc = Assert.Throws<RedisServerException>(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect));
627627
ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node "));
628628
}
629+
[Test, Order(14), CancelAfter(testTimeout)]
630+
[Category("REPLICATION")]
631+
public void ClusterFailoverSucceedsDuringEnsureReplication(CancellationToken cancellationToken)
632+
{
633+
// Verify that EnsureReplication does not block an in-flight failover.
634+
// EnsureReplication polls for dropped replication sessions and attempts auto-resync.
635+
// Without the failover status guard, it could acquire a ReadRole lock that blocks
636+
// TakeOverAsPrimary from obtaining its ClusterFailover write lock, aborting the failover.
637+
var primaryIndex = 0;
638+
var replicaIndex = 1;
639+
var nodes_count = 2;
640+
641+
// Enable EnsureReplication by setting clusterReplicationReestablishmentTimeout to 1 second
642+
context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true,
643+
timeout: timeout, clusterReplicationReestablishmentTimeout: 1);
644+
context.CreateConnection();
645+
646+
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
647+
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
648+
context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger);
649+
context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger);
650+
context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
651+
652+
// Set up replication
653+
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, logger: context.logger);
654+
ClassicAssert.AreEqual("OK", resp);
655+
context.clusterTestUtils.WaitForReplicaRecovery(replicaIndex, context.logger);
656+
context.clusterTestUtils.WaitForConnectedReplicaCount(primaryIndex, 1, context.logger);
657+
658+
// Populate primary and wait for sync
659+
context.kvPairs = [];
660+
context.PopulatePrimary(ref context.kvPairs, keyLength: 32, kvpairCount: 16, primaryIndex);
661+
context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger);
662+
663+
// Issue failover — with EnsureReplication enabled (polling every 1s),
664+
// the guard should prevent it from interfering with the failover
665+
resp = context.clusterTestUtils.ClusterFailover(replicaIndex, logger: context.logger);
666+
ClassicAssert.AreEqual("OK", resp);
667+
668+
// Wait for failover to complete — without the guard this could hang or abort
669+
context.clusterTestUtils.WaitForFailoverCompleted(replicaIndex, context.logger);
670+
671+
// The old primary should become a replica
672+
context.clusterTestUtils.WaitForReplicaRecovery(primaryIndex, context.logger);
673+
674+
// Verify the new primary (formerly replica) is functional
675+
var role = context.clusterTestUtils.RoleCommand(replicaIndex, context.logger);
676+
ClassicAssert.AreEqual("master", role.Value);
677+
678+
var oldPrimaryRole = context.clusterTestUtils.RoleCommand(primaryIndex, context.logger);
679+
ClassicAssert.AreEqual("slave", oldPrimaryRole.Value);
680+
681+
// Verify last failover state
682+
var infoItem = context.clusterTestUtils.GetReplicationInfo(replicaIndex,
683+
[ReplicationInfoItem.LAST_FAILOVER_STATE], logger: context.logger);
684+
ClassicAssert.AreEqual("failover-completed", infoItem[0].Item2);
685+
}
686+
687+
[Test, Order(15), CancelAfter(testTimeout)]
688+
[Category("REPLICATION")]
689+
public void ClusterEnsureReplicationWorksAfterFailover(CancellationToken cancellationToken)
690+
{
691+
// Verify that EnsureReplication still functions after a failover completes.
692+
// The failover guard should only suppress auto-resync during active failover states,
693+
// not after failover has completed or aborted.
694+
var primaryIndex = 0;
695+
var replicaIndex = 1;
696+
var nodes_count = 2;
697+
698+
// Enable EnsureReplication with a 1-second poll frequency
699+
context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true,
700+
timeout: timeout, clusterReplicationReestablishmentTimeout: 1);
701+
context.CreateConnection();
702+
703+
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
704+
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
705+
context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger);
706+
context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger);
707+
context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
708+
709+
// Set up replication
710+
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, logger: context.logger);
711+
ClassicAssert.AreEqual("OK", resp);
712+
context.clusterTestUtils.WaitForReplicaRecovery(replicaIndex, context.logger);
713+
context.clusterTestUtils.WaitForConnectedReplicaCount(primaryIndex, 1, context.logger);
714+
715+
// Populate primary data and sync
716+
context.kvPairs = [];
717+
context.PopulatePrimary(ref context.kvPairs, keyLength: 32, kvpairCount: 16, primaryIndex);
718+
context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger);
719+
720+
// Run failover
721+
resp = context.clusterTestUtils.ClusterFailover(replicaIndex, logger: context.logger);
722+
ClassicAssert.AreEqual("OK", resp);
723+
context.clusterTestUtils.WaitForFailoverCompleted(replicaIndex, context.logger);
724+
725+
// Old primary should now be a replica of the new primary
726+
context.clusterTestUtils.WaitForReplicaRecovery(primaryIndex, context.logger);
727+
728+
// Verify last_failover_state is "failover-completed" — this should NOT suppress EnsureReplication
729+
var infoItem = context.clusterTestUtils.GetReplicationInfo(replicaIndex,
730+
[ReplicationInfoItem.LAST_FAILOVER_STATE], logger: context.logger);
731+
ClassicAssert.AreEqual("failover-completed", infoItem[0].Item2);
732+
733+
// Verify replication is working in the new topology
734+
// The old primary (index 0) is now a replica of the new primary (index 1)
735+
// Write to new primary and verify it replicates to old primary (now replica)
736+
var slotMap = new int[16384];
737+
for (var i = 0; i < 16384; i++)
738+
slotMap[i] = replicaIndex;
739+
740+
var newKvPairs = new Dictionary<string, int>();
741+
context.PopulatePrimary(ref newKvPairs, keyLength: 32, kvpairCount: 8, replicaIndex, slotMap: slotMap);
742+
context.clusterTestUtils.WaitForReplicaAofSync(replicaIndex, primaryIndex, context.logger);
743+
context.ValidateKVCollectionAgainstReplica(ref newKvPairs, primaryIndex);
744+
}
745+
746+
#if DEBUG
747+
[Test, Order(16), CancelAfter(testTimeout)]
748+
[Category("REPLICATION")]
749+
public void ClusterFailoverResetsPrimaryOnTakeOverFailure(CancellationToken cancellationToken)
750+
{
751+
// Verify that when TakeOverAsPrimary fails after PauseWritesAndWaitForSync
752+
// has already sent failstopwrites to the primary, the primary is reset back
753+
// to its original state (owns slots, is a primary).
754+
//
755+
// Without the reset in BeginAsyncReplicaFailover's finally block, the primary
756+
// would have given up its slots (via TryStopWrites) but the replica never
757+
// claimed them, leaving the cluster in an incoherent state.
758+
var primaryIndex = 0;
759+
var replicaIndex = 1;
760+
var nodes_count = 2;
761+
762+
context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, timeout: timeout);
763+
context.CreateConnection();
764+
765+
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
766+
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
767+
context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger);
768+
context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger);
769+
context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
770+
771+
// Set up replication
772+
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaIndex, primaryNodeIndex: primaryIndex, logger: context.logger);
773+
ClassicAssert.AreEqual("OK", resp);
774+
context.clusterTestUtils.WaitForReplicaRecovery(replicaIndex, context.logger);
775+
context.clusterTestUtils.WaitForConnectedReplicaCount(primaryIndex, 1, context.logger);
776+
777+
// Populate data and sync
778+
context.kvPairs = [];
779+
context.PopulatePrimary(ref context.kvPairs, keyLength: 32, kvpairCount: 16, primaryIndex);
780+
context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex, replicaIndex, context.logger);
781+
782+
// Verify primary owns all slots before failover
783+
var slotsBefore = context.clusterTestUtils.GetOwnedSlotsFromNode(primaryIndex, context.logger);
784+
ClassicAssert.AreEqual(16384, slotsBefore.Count, "Primary should own all slots before failover");
785+
786+
try
787+
{
788+
// Enable exception injection to make TakeOverAsPrimary fail.
789+
// This simulates the scenario where PauseWritesAndWaitForSync succeeds
790+
// (failstopwrites sent to primary, primary gives up slots) but the
791+
// subsequent TakeOverAsPrimary fails (e.g., due to lock contention from
792+
// EnsureReplication holding the ReadRole lock).
793+
ExceptionInjectionHelper.EnableException(ExceptionInjectionType.Failover_Fail_TakeOverAsPrimary);
794+
795+
// Issue DEFAULT failover — this will:
796+
// 1. Send failstopwrites to primary (primary gives up slots)
797+
// 2. Wait for sync
798+
// 3. TakeOverAsPrimary — FAILS due to injected exception
799+
// 4. finally block sends failstopwrites([]) to reset primary
800+
resp = context.clusterTestUtils.ClusterFailover(replicaIndex, logger: context.logger);
801+
ClassicAssert.AreEqual("OK", resp);
802+
803+
// Wait for failover to be aborted
804+
while (true)
805+
{
806+
var infoItem = context.clusterTestUtils.GetReplicationInfo(replicaIndex,
807+
[ReplicationInfoItem.LAST_FAILOVER_STATE], logger: context.logger);
808+
if (infoItem[0].Item2.Equals("failover-aborted"))
809+
break;
810+
ClusterTestUtils.BackOff(cancellationToken: cancellationToken, msg: "Waiting for failover to abort");
811+
}
812+
}
813+
finally
814+
{
815+
ExceptionInjectionHelper.DisableException(ExceptionInjectionType.Failover_Fail_TakeOverAsPrimary);
816+
}
817+
818+
// Verify primary has been reset: it should own all slots again
819+
// Without the reset fix, the primary would have 0 slots here.
820+
while (true)
821+
{
822+
var slotsAfter = context.clusterTestUtils.GetOwnedSlotsFromNode(primaryIndex, context.logger);
823+
if (slotsAfter.Count == 16384)
824+
break;
825+
ClusterTestUtils.BackOff(cancellationToken: cancellationToken, msg: $"Waiting for primary to reclaim slots (current: {slotsAfter.Count})");
826+
}
827+
828+
// Verify primary is still a primary
829+
var role = context.clusterTestUtils.RoleCommand(primaryIndex, logger: context.logger);
830+
ClassicAssert.AreEqual("master", role.Value, "Primary should be reset back to master role");
831+
832+
// Verify replica is still a replica (failover was aborted)
833+
role = context.clusterTestUtils.RoleCommand(replicaIndex, logger: context.logger);
834+
ClassicAssert.AreEqual("slave", role.Value, "Replica should remain a slave after aborted failover");
835+
836+
// Verify data is still accessible from the primary
837+
context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, primaryIndex);
838+
}
839+
#endif
629840
}
630841
}

0 commit comments

Comments
 (0)