Skip to content

Commit f2ccd4e

Browse files
committed
add maxAdaptiveRetries and enableOverloadRetargeting
1 parent 776ecc7 commit f2ccd4e

40 files changed

+1483
-761
lines changed

internal/integration/backpressure_prose_test.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,16 @@ func TestBackpressureProse(t *testing.T) {
4040
mt.ResetClient(options.Client())
4141

4242
transWithJitter := func(t *mtest.T, ratio float64) time.Duration {
43-
defer randutil.SetJitterForTesting(ratio)()
43+
defer randutil.SetJitterForTesting(func(n int64) int64 {
44+
val := int64(ratio * float64(n))
45+
if val < 0 {
46+
return 0
47+
}
48+
if val > n {
49+
return n
50+
}
51+
return val
52+
})()
4453

4554
startTime := time.Now()
4655
_, err := t.Coll.InsertOne(context.Background(), bson.D{{"a", 1}})
@@ -49,9 +58,9 @@ func TestBackpressureProse(t *testing.T) {
4958
}
5059
noBackoffTime := transWithJitter(mt, 0)
5160
withBackoffTime := transWithJitter(mt, 1)
52-
assert.GreaterOrEqualf(
61+
assert.InDelta(
5362
mt,
54-
withBackoffTime, noBackoffTime+2_100*time.Millisecond,
63+
withBackoffTime, noBackoffTime+300*time.Millisecond, float64(300*time.Millisecond),
5564
"with backoff time: %v, no backoff time: %v", withBackoffTime, noBackoffTime,
5665
)
5766
})
@@ -81,6 +90,34 @@ func TestBackpressureProse(t *testing.T) {
8190
require.Truef(mt, errors.As(err, &cmdErr), "expected a CommandError, got %T: %v", err, err)
8291
assert.True(mt, cmdErr.HasErrorLabel("RetryableError"), `expected error has "RetryableError" label`)
8392
assert.True(mt, cmdErr.HasErrorLabel("SystemOverloadedError"), `expected error has "SystemOverloadedError" label`)
84-
assert.Equalf(mt, 6, opsCnt, "expected 6 attempts (1 original + 5 retries), got %d", opsCnt)
93+
assert.Equalf(mt, 3, opsCnt, "expected 3 attempts (1 original + 2 retries), got %d", opsCnt)
94+
})
95+
mt.Run("4. Overload Errors are Retried a Maximum of maxAdaptiveRetries times when configured", func(mt *mtest.T) {
96+
mt.SetFailPoint(failpoint.FailPoint{
97+
ConfigureFailPoint: "failCommand",
98+
Mode: failpoint.ModeAlwaysOn,
99+
Data: failpoint.Data{
100+
FailCommands: []string{"find"},
101+
ErrorCode: 462,
102+
ErrorLabels: &[]string{"SystemOverloadedError", "RetryableError"},
103+
},
104+
})
105+
106+
var opsCnt int
107+
monitor := &event.CommandMonitor{
108+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
109+
if e.CommandName == "find" {
110+
opsCnt++
111+
}
112+
},
113+
}
114+
mt.ResetClient(options.Client().SetMonitor(monitor).SetMaxAdaptiveRetries(1))
115+
116+
_, err := mt.Coll.Find(context.Background(), bson.D{})
117+
var cmdErr mongo.CommandError
118+
require.Truef(mt, errors.As(err, &cmdErr), "expected a CommandError, got %T: %v", err, err)
119+
assert.True(mt, cmdErr.HasErrorLabel("RetryableError"), `expected error has "RetryableError" label`)
120+
assert.True(mt, cmdErr.HasErrorLabel("SystemOverloadedError"), `expected error has "SystemOverloadedError" label`)
121+
assert.Equalf(mt, 2, opsCnt, "expected 2 attempts (1 original + 1 retry), got %d", opsCnt)
85122
})
86123
}

internal/integration/retryable_reads_prose_test.go

Lines changed: 125 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package integration
88

99
import (
1010
"context"
11+
"errors"
1112
"sync"
1213
"testing"
1314
"time"
@@ -19,7 +20,9 @@ import (
1920
"go.mongodb.org/mongo-driver/v2/internal/failpoint"
2021
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
2122
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
23+
"go.mongodb.org/mongo-driver/v2/internal/randutil"
2224
"go.mongodb.org/mongo-driver/v2/internal/require"
25+
"go.mongodb.org/mongo-driver/v2/mongo"
2326
"go.mongodb.org/mongo-driver/v2/mongo/options"
2427
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
2528
)
@@ -206,24 +209,34 @@ func TestRetryableReadsProse(t *testing.T) {
206209
mtOpts = mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.4").CreateClient(false)
207210
mt.RunOpts("retrying reads in a replica set", mtOpts, func(mt *mtest.T) {
208211
tests := []struct {
209-
name string
210-
errLabels []string
211-
isServerIdentical bool
212+
name string
213+
errLabels []string
214+
enableOverloadRetargeting bool
215+
isServerIdentical bool
212216
}{
213217
{
214-
name: "overload errors",
215-
errLabels: []string{"RetryableError", "SystemOverloadedError"},
216-
isServerIdentical: false,
218+
name: "overload errors retried on a different replicaset server",
219+
errLabels: []string{"RetryableError", "SystemOverloadedError"},
220+
enableOverloadRetargeting: true,
221+
isServerIdentical: false,
217222
},
218223
{
219-
name: "non-overload errors",
224+
name: "non-overload errors retried on the same replicaset server",
220225
errLabels: []string{"RetryableError"},
221226
isServerIdentical: true,
222227
},
228+
{
229+
name: "overload errors retried on the same replicaset server",
230+
errLabels: []string{"RetryableError", "SystemOverloadedError"},
231+
isServerIdentical: true,
232+
},
223233
}
224234

225-
clientOpts := options.Client().SetRetryReads(true).SetReadPreference(readpref.PrimaryPreferred())
226235
for _, tc := range tests {
236+
clientOpts := options.Client().SetRetryReads(true).SetReadPreference(readpref.PrimaryPreferred())
237+
if tc.enableOverloadRetargeting {
238+
clientOpts = clientOpts.SetEnableOverloadRetargeting(true)
239+
}
227240
mt.RunOpts(tc.name, mtest.NewOptions().ClientOptions(clientOpts), func(mt *mtest.T) {
228241
failPoint := failpoint.FailPoint{
229242
ConfigureFailPoint: "failCommand",
@@ -254,4 +267,108 @@ func TestRetryableReadsProse(t *testing.T) {
254267
})
255268
}
256269
})
270+
271+
errorCodesContains := func(err error, code int) bool {
272+
for _, ec := range mongo.ErrorCodes(err) {
273+
if ec == code {
274+
return true
275+
}
276+
}
277+
return false
278+
}
279+
280+
mtOpts = mtest.NewOptions().MinServerVersion("4.4").ClientType(mtest.Pinned).AllowFailPointsOnSharded()
281+
mt.RunOpts("set the maximum number of retries for all retryable read errors", mtOpts, func(mt *mtest.T) {
282+
mt.SetFailPoint(failpoint.FailPoint{
283+
ConfigureFailPoint: "failCommand",
284+
Mode: failpoint.Mode{
285+
Times: 1,
286+
},
287+
Data: failpoint.Data{
288+
FailCommands: []string{"find"},
289+
ErrorLabels: &[]string{"RetryableError", "SystemOverloadedError"},
290+
ErrorCode: 91,
291+
},
292+
})
293+
294+
var opsCnt int
295+
monitor := &event.CommandMonitor{
296+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
297+
if e.CommandName == "find" {
298+
opsCnt++
299+
}
300+
},
301+
Failed: func(_ context.Context, event *event.CommandFailedEvent) {
302+
if event.CommandName != "find" {
303+
return
304+
}
305+
if errorCodesContains(event.Failure, 91) {
306+
mt.SetFailPoint(failpoint.FailPoint{
307+
ConfigureFailPoint: "failCommand",
308+
Mode: failpoint.ModeAlwaysOn,
309+
Data: failpoint.Data{
310+
FailCommands: []string{"find"},
311+
ErrorLabels: &[]string{"RetryableError"},
312+
ErrorCode: 91,
313+
},
314+
})
315+
}
316+
},
317+
}
318+
mt.ResetClient(options.Client().SetRetryReads(true).SetMonitor(monitor))
319+
_, err := mt.Coll.Find(context.Background(), bson.D{})
320+
var cmdErr mongo.CommandError
321+
require.Truef(mt, errors.As(err, &cmdErr), "expected a CommandError, got %T: %v", err, err)
322+
assert.Equalf(mt, 3, opsCnt, "expected 3 attempts (1 original + 2 retries), got %d", opsCnt)
323+
})
324+
325+
mt.RunOpts("do not apply backoff to non-overload errors", mtOpts, func(mt *mtest.T) {
326+
mt.SetFailPoint(failpoint.FailPoint{
327+
ConfigureFailPoint: "failCommand",
328+
Mode: failpoint.Mode{
329+
Times: 1,
330+
},
331+
Data: failpoint.Data{
332+
FailCommands: []string{"find"},
333+
ErrorLabels: &[]string{"RetryableError", "SystemOverloadedError"},
334+
ErrorCode: 91,
335+
},
336+
})
337+
338+
var ops []bool
339+
monitor := &event.CommandMonitor{
340+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
341+
if e.CommandName == "find" {
342+
ops = append(ops, false)
343+
}
344+
},
345+
Failed: func(_ context.Context, event *event.CommandFailedEvent) {
346+
if event.CommandName != "find" {
347+
return
348+
}
349+
if errorCodesContains(event.Failure, 91) {
350+
mt.SetFailPoint(failpoint.FailPoint{
351+
ConfigureFailPoint: "failCommand",
352+
Mode: failpoint.ModeAlwaysOn,
353+
Data: failpoint.Data{
354+
FailCommands: []string{"find"},
355+
ErrorLabels: &[]string{"RetryableError"},
356+
ErrorCode: 91,
357+
},
358+
})
359+
}
360+
},
361+
}
362+
363+
defer randutil.SetJitterForTesting(func(int64) int64 {
364+
ops[len(ops)-1] = true
365+
return 0
366+
})()
367+
368+
mt.ResetClient(options.Client().SetRetryReads(true).SetMonitor(monitor))
369+
_, err := mt.Coll.Find(context.Background(), bson.D{})
370+
var cmdErr mongo.CommandError
371+
require.Truef(mt, errors.As(err, &cmdErr), "expected a CommandError, got %T: %v", err, err)
372+
assert.Equal(t, []bool{true, false, false}, ops, "expected backoff to be applied on the first attempt only, got %v", ops)
373+
})
257374
}

internal/integration/retryable_writes_prose_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.mongodb.org/mongo-driver/v2/internal/failpoint"
2424
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
2525
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
26+
"go.mongodb.org/mongo-driver/v2/internal/randutil"
2627
"go.mongodb.org/mongo-driver/v2/internal/require"
2728
"go.mongodb.org/mongo-driver/v2/mongo"
2829
"go.mongodb.org/mongo-driver/v2/mongo/options"
@@ -511,6 +512,102 @@ func TestErrorPropagationAfterEncounteringMultipleErrors(t *testing.T) {
511512
require.True(mt, errors.As(err, &labeledError), "expected error to be a labeled error")
512513
require.NotContains(mt, labeledError.Labels, "NoWritesPerformed", "expected error labels to not contain NoWritesPerformed")
513514
})
515+
516+
mt.Run("Case 4: Test that drivers set the maximum number of retries for all retryable write errors when an overload error is encountered", func(mt *mtest.T) {
517+
mt.SetFailPoint(failpoint.FailPoint{
518+
ConfigureFailPoint: "failCommand",
519+
Mode: failpoint.Mode{
520+
Times: 1,
521+
},
522+
Data: failpoint.Data{
523+
FailCommands: []string{"insert"},
524+
ErrorLabels: &[]string{"RetryableError", "SystemOverloadedError"},
525+
ErrorCode: 91,
526+
},
527+
})
528+
529+
var opsCnt int
530+
monitor := &event.CommandMonitor{
531+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
532+
if e.CommandName == "insert" {
533+
opsCnt++
534+
}
535+
},
536+
Failed: func(_ context.Context, event *event.CommandFailedEvent) {
537+
if event.CommandName != "insert" {
538+
return
539+
}
540+
if errorCodesContains(event.Failure, 91) {
541+
mt.SetFailPoint(failpoint.FailPoint{
542+
ConfigureFailPoint: "failCommand",
543+
Mode: failpoint.ModeAlwaysOn,
544+
Data: failpoint.Data{
545+
FailCommands: []string{"insert"},
546+
ErrorLabels: &[]string{"RetryableError", "RetryableWriteError"},
547+
ErrorCode: 91,
548+
},
549+
})
550+
}
551+
},
552+
}
553+
mt.ResetClient(options.Client().SetRetryWrites(true).SetMonitor(monitor))
554+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
555+
require.True(mt, errorCodesContains(err, 91), "Expect the error code of the server error to be 91")
556+
var labeledError driver.Error
557+
require.True(mt, errors.As(err, &labeledError), "expected error to be a labeled error")
558+
assert.Equalf(mt, 3, opsCnt, "expected 3 attempts (1 original + 2 retries), got %d", opsCnt)
559+
})
560+
561+
mt.Run("Case 5: Test that drivers do not apply backoff to non-overload errors", func(mt *mtest.T) {
562+
mt.SetFailPoint(failpoint.FailPoint{
563+
ConfigureFailPoint: "failCommand",
564+
Mode: failpoint.Mode{
565+
Times: 1,
566+
},
567+
Data: failpoint.Data{
568+
FailCommands: []string{"insert"},
569+
ErrorLabels: &[]string{"RetryableError", "SystemOverloadedError"},
570+
ErrorCode: 91,
571+
},
572+
})
573+
574+
var ops []bool
575+
monitor := &event.CommandMonitor{
576+
Started: func(_ context.Context, e *event.CommandStartedEvent) {
577+
if e.CommandName == "insert" {
578+
ops = append(ops, false)
579+
}
580+
},
581+
Failed: func(_ context.Context, event *event.CommandFailedEvent) {
582+
if event.CommandName != "insert" {
583+
return
584+
}
585+
if errorCodesContains(event.Failure, 91) {
586+
mt.SetFailPoint(failpoint.FailPoint{
587+
ConfigureFailPoint: "failCommand",
588+
Mode: failpoint.ModeAlwaysOn,
589+
Data: failpoint.Data{
590+
FailCommands: []string{"insert"},
591+
ErrorLabels: &[]string{"RetryableError", "RetryableWriteError"},
592+
ErrorCode: 91,
593+
},
594+
})
595+
}
596+
},
597+
}
598+
599+
defer randutil.SetJitterForTesting(func(int64) int64 {
600+
ops[len(ops)-1] = true
601+
return 0
602+
})()
603+
604+
mt.ResetClient(options.Client().SetRetryWrites(true).SetMonitor(monitor))
605+
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
606+
require.True(mt, errorCodesContains(err, 91), "Expect the error code of the server error to be 91")
607+
var labeledError driver.Error
608+
require.True(mt, errors.As(err, &labeledError), "expected error to be a labeled error")
609+
assert.Equal(t, []bool{true, false, false}, ops, "expected backoff to be applied on the first attempt only, got %v", ops)
610+
})
514611
}
515612

516613
type crudOperation struct {

internal/randutil/jitter.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,26 @@
66

77
package randutil
88

9-
var presetRatio *float64
9+
import "sync"
1010

1111
var globalRand = NewLockedRand()
1212

1313
// JitterInt63n returns, as an int64, a non-negative pseudo-random number in
1414
// the half-open interval [0,n). It panics if n <= 0.
1515
//
16-
// If a static jitter ratio is set by calling SetJitterForTesting, JitterInt63n
17-
// returns int64(n*ratio) in [0,n].
18-
func JitterInt63n(n int64) int64 {
19-
if presetRatio == nil {
20-
return globalRand.Int63n(n)
21-
}
22-
val := int64(*presetRatio * float64(n))
23-
if val < 0 {
24-
return 0
25-
}
26-
if val > n {
27-
return n
28-
}
29-
return int64(val)
30-
}
16+
// If a test jitter function is set by calling SetJitterForTesting, JitterInt63n
17+
// returns the value from the custom function.
18+
var JitterInt63n func(int64) int64 = globalRand.Int63n
3119

32-
// SetJitterForTesting sets a preset jitter ratio for testing and returns a restore function.
33-
func SetJitterForTesting(ratio float64) func() {
34-
presetRatio = &ratio
35-
return func() { presetRatio = nil }
20+
var testLock sync.Mutex
21+
22+
// SetJitterForTesting sets a custom jitter function for testing and returns a restore function.
23+
func SetJitterForTesting(f func(int64) int64) func() {
24+
testLock.Lock()
25+
26+
JitterInt63n = f
27+
return func() {
28+
JitterInt63n = globalRand.Int63n
29+
testLock.Unlock()
30+
}
3631
}

0 commit comments

Comments
 (0)