Skip to content

Commit 20e4706

Browse files
committed
Move current read index tests to read package
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
1 parent da98ad0 commit 20e4706

2 files changed

Lines changed: 251 additions & 223 deletions

File tree

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
// Copyright 2026 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package read
16+
17+
import (
18+
"context"
19+
"encoding/binary"
20+
"sync"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/require"
25+
"go.uber.org/zap"
26+
"go.uber.org/zap/zaptest"
27+
28+
"go.etcd.io/etcd/server/v3/etcdserver/errors"
29+
"go.etcd.io/raft/v3"
30+
)
31+
32+
func TestRequestCurrentIndex_LeaderChangedRace(t *testing.T) {
33+
s, mockRaft := setupTestRequestCurrentIndex(t)
34+
r := NewRead(s, mockRaft)
35+
36+
for i := 0; i < 100; i++ {
37+
mockRaft.readStateC <- raft.ReadState{Index: 100}
38+
s.leaderChanged <- struct{}{}
39+
40+
index, err := r.RequestCurrentIndex(s.LeaderChanged())
41+
require.ErrorIs(t, err, errors.ErrLeaderChanged)
42+
require.Equal(t, uint64(0), index)
43+
44+
select {
45+
case <-mockRaft.readStateC:
46+
default:
47+
}
48+
select {
49+
case <-s.leaderChanged:
50+
default:
51+
}
52+
}
53+
}
54+
55+
func TestRequestCurrentIndex_UniqueRequestID(t *testing.T) {
56+
s, mockRaft := setupTestRequestCurrentIndex(t)
57+
r := NewRead(s, mockRaft)
58+
59+
wg := sync.WaitGroup{}
60+
wg.Add(1)
61+
go func() {
62+
defer wg.Done()
63+
r.RequestCurrentIndex(s.LeaderChanged())
64+
}()
65+
66+
require.Eventually(t, func() bool {
67+
return len(mockRaft.getRequests()) >= 2
68+
}, time.Second, 100*time.Millisecond)
69+
70+
s.leaderChanged <- struct{}{}
71+
wg.Wait()
72+
73+
seen := make(map[uint64]bool)
74+
for _, id := range mockRaft.getRequests() {
75+
require.Falsef(t, seen[id], "Found duplicate request ID: %d", id)
76+
seen[id] = true
77+
}
78+
}
79+
80+
func TestRequestCurrentIndex_Success(t *testing.T) {
81+
s, mockRaft := setupTestRequestCurrentIndex(t)
82+
r := NewRead(s, mockRaft)
83+
84+
wg := sync.WaitGroup{}
85+
wg.Add(1)
86+
var index uint64
87+
var err error
88+
go func() {
89+
defer wg.Done()
90+
index, err = r.RequestCurrentIndex(s.LeaderChanged())
91+
}()
92+
93+
require.Eventually(t, func() bool {
94+
return len(mockRaft.getRequests()) == 1
95+
}, time.Second, 100*time.Millisecond)
96+
97+
reqID := mockRaft.getRequests()[0]
98+
reqIDBytes := make([]byte, 8)
99+
binary.BigEndian.PutUint64(reqIDBytes, reqID)
100+
101+
mockRaft.readStateC <- raft.ReadState{
102+
Index: 100,
103+
RequestCtx: reqIDBytes,
104+
}
105+
106+
wg.Wait()
107+
108+
require.NoError(t, err)
109+
require.Equal(t, uint64(100), index)
110+
require.Lenf(t, mockRaft.getRequests(), 1, "Expected exactly 1 ReadIndex request")
111+
}
112+
113+
func TestRequestCurrentIndex_WrongRequestID(t *testing.T) {
114+
s, mockRaft := setupTestRequestCurrentIndex(t)
115+
r := NewRead(s, mockRaft)
116+
117+
wg := sync.WaitGroup{}
118+
wg.Add(1)
119+
var index uint64
120+
var err error
121+
go func() {
122+
defer wg.Done()
123+
index, err = r.RequestCurrentIndex(s.LeaderChanged())
124+
}()
125+
126+
require.Eventually(t, func() bool {
127+
return len(mockRaft.getRequests()) == 1
128+
}, time.Second, 10*time.Millisecond)
129+
130+
wrongReqIDBytes := make([]byte, 8)
131+
binary.BigEndian.PutUint64(wrongReqIDBytes, 99999)
132+
133+
mockRaft.readStateC <- raft.ReadState{
134+
Index: 100,
135+
RequestCtx: wrongReqIDBytes,
136+
}
137+
138+
time.Sleep(100 * time.Millisecond)
139+
requests := mockRaft.getRequests()
140+
require.Lenf(t, requests, 1, "Expected exactly 1 ReadIndex request")
141+
142+
reqID := requests[0]
143+
reqIDBytes := make([]byte, 8)
144+
binary.BigEndian.PutUint64(reqIDBytes, reqID)
145+
146+
mockRaft.readStateC <- raft.ReadState{
147+
Index: 99,
148+
RequestCtx: reqIDBytes,
149+
}
150+
wg.Wait()
151+
152+
require.NoError(t, err)
153+
require.Equal(t, uint64(99), index)
154+
require.Lenf(t, mockRaft.getRequests(), 1, "Expected exactly 1 ReadIndex request")
155+
}
156+
157+
func TestRequestCurrentIndex_DelayedResponse(t *testing.T) {
158+
s, mockRaft := setupTestRequestCurrentIndex(t)
159+
r := NewRead(s, mockRaft)
160+
161+
wg := sync.WaitGroup{}
162+
wg.Add(1)
163+
var index uint64
164+
var err error
165+
go func() {
166+
defer wg.Done()
167+
index, err = r.RequestCurrentIndex(s.LeaderChanged())
168+
}()
169+
170+
require.Eventually(t, func() bool {
171+
return len(mockRaft.getRequests()) >= 3
172+
}, 2*time.Second, 100*time.Millisecond)
173+
requests := mockRaft.getRequests()
174+
175+
reqID := requests[1]
176+
reqIDBytes := make([]byte, 8)
177+
binary.BigEndian.PutUint64(reqIDBytes, reqID)
178+
179+
select {
180+
case mockRaft.readStateC <- raft.ReadState{
181+
Index: 100,
182+
RequestCtx: reqIDBytes,
183+
}:
184+
case <-time.After(time.Second):
185+
t.Fatal("timed out sending read state")
186+
}
187+
wg.Wait()
188+
189+
require.NoError(t, err)
190+
require.Equal(t, uint64(100), index)
191+
}
192+
193+
func setupTestRequestCurrentIndex(t *testing.T) (*mockServer, *testRaftNode) {
194+
s := &mockServer{
195+
leaderChanged: make(chan struct{}, 1),
196+
stopping: make(chan struct{}, 1),
197+
done: make(chan struct{}, 1),
198+
logger: zaptest.NewLogger(t),
199+
timeout: 5 * time.Second,
200+
firstCommit: make(chan struct{}, 1),
201+
}
202+
mockRaft := &testRaftNode{
203+
readStateC: make(chan raft.ReadState, 1),
204+
}
205+
return s, mockRaft
206+
}
207+
208+
type testRaftNode struct {
209+
readStateC chan raft.ReadState
210+
mu sync.Mutex
211+
readIndexRequests []uint64
212+
}
213+
214+
func (m *testRaftNode) ReadState() <-chan raft.ReadState { return m.readStateC }
215+
func (m *testRaftNode) ReadIndex(ctx context.Context, rctx []byte) error {
216+
m.mu.Lock()
217+
defer m.mu.Unlock()
218+
if len(rctx) == 8 {
219+
m.readIndexRequests = append(m.readIndexRequests, binary.BigEndian.Uint64(rctx))
220+
}
221+
return nil
222+
}
223+
224+
func (m *testRaftNode) getRequests() []uint64 {
225+
m.mu.Lock()
226+
defer m.mu.Unlock()
227+
res := make([]uint64, len(m.readIndexRequests))
228+
copy(res, m.readIndexRequests)
229+
return res
230+
}
231+
232+
type mockServer struct {
233+
leaderChanged chan struct{}
234+
stopping chan struct{}
235+
done chan struct{}
236+
logger *zap.Logger
237+
appliedIndex uint64
238+
nextRequestID uint64
239+
timeout time.Duration
240+
firstCommit chan struct{}
241+
}
242+
243+
func (s *mockServer) LeaderChanged() <-chan struct{} { return s.leaderChanged }
244+
func (s *mockServer) Stopping() <-chan struct{} { return s.stopping }
245+
func (s *mockServer) Logger() *zap.Logger { return s.logger }
246+
func (s *mockServer) AppliedIndex() uint64 { return s.appliedIndex }
247+
func (s *mockServer) ApplyWait(deadline uint64) <-chan struct{} { return nil }
248+
func (s *mockServer) NextRequestID() uint64 { s.nextRequestID++; return s.nextRequestID }
249+
func (s *mockServer) RequestTimeout() time.Duration { return s.timeout }
250+
func (s *mockServer) FirstCommitInTermNotify() <-chan struct{} { return s.firstCommit }
251+
func (s *mockServer) Done() <-chan struct{} { return s.done }

0 commit comments

Comments
 (0)