@@ -1578,40 +1578,44 @@ func TestAddFeatureGateMetrics(t *testing.T) {
15781578}
15791579
15801580func TestRequestCurrentIndex_LeaderChangedRace (t * testing.T ) {
1581- s , _ := setupTestRequestCurrentIndex (t )
1581+ s , mockRaft := setupTestRequestCurrentIndex (t )
1582+ r := read .NewRead (s , mockRaft )
15821583
15831584 for i := 0 ; i < 100 ; i ++ {
1584- s .r .readStateC <- raft.ReadState {Index : 100 }
1585- leaderChangedNotifier := s .leaderChanged .Receive ()
1586- s .leaderChanged .Notify ()
1585+ mockRaft .readStateC <- raft.ReadState {Index : 100 }
1586+ s .leaderChanged <- struct {}{}
15871587
1588- index , err := s . read . RequestCurrentIndex (leaderChangedNotifier )
1588+ index , err := r . RequestCurrentIndex (s . LeaderChanged () )
15891589 require .ErrorIs (t , err , errors .ErrLeaderChanged )
15901590 require .Equal (t , uint64 (0 ), index )
15911591
1592- // Clear the readStateC channel for the next iteration,
15931592 select {
1594- case <- s .r .readStateC :
1593+ case <- mockRaft .readStateC :
1594+ default :
1595+ }
1596+ select {
1597+ case <- s .leaderChanged :
15951598 default :
15961599 }
15971600 }
15981601}
15991602
16001603func TestRequestCurrentIndex_UniqueRequestID (t * testing.T ) {
16011604 s , mockRaft := setupTestRequestCurrentIndex (t )
1605+ r := read .NewRead (s , mockRaft )
16021606
16031607 wg := sync.WaitGroup {}
16041608 wg .Add (1 )
16051609 go func () {
16061610 defer wg .Done ()
1607- s . read . RequestCurrentIndex (s .leaderChanged . Receive ())
1611+ r . RequestCurrentIndex (s .LeaderChanged ())
16081612 }()
16091613
16101614 require .Eventually (t , func () bool {
16111615 return len (mockRaft .getRequests ()) >= 2
16121616 }, time .Second , 100 * time .Millisecond )
16131617
1614- s .leaderChanged . Notify ()
1618+ s .leaderChanged <- struct {}{}
16151619 wg .Wait ()
16161620
16171621 seen := make (map [uint64 ]bool )
@@ -1623,14 +1627,15 @@ func TestRequestCurrentIndex_UniqueRequestID(t *testing.T) {
16231627
16241628func TestRequestCurrentIndex_Success (t * testing.T ) {
16251629 s , mockRaft := setupTestRequestCurrentIndex (t )
1630+ r := read .NewRead (s , mockRaft )
16261631
16271632 wg := sync.WaitGroup {}
16281633 wg .Add (1 )
16291634 var index uint64
16301635 var err error
16311636 go func () {
16321637 defer wg .Done ()
1633- index , err = s . read . RequestCurrentIndex (s .leaderChanged . Receive ())
1638+ index , err = r . RequestCurrentIndex (s .LeaderChanged ())
16341639 }()
16351640
16361641 require .Eventually (t , func () bool {
@@ -1641,7 +1646,7 @@ func TestRequestCurrentIndex_Success(t *testing.T) {
16411646 reqIDBytes := make ([]byte , 8 )
16421647 binary .BigEndian .PutUint64 (reqIDBytes , reqID )
16431648
1644- s . r .readStateC <- raft.ReadState {
1649+ mockRaft .readStateC <- raft.ReadState {
16451650 Index : 100 ,
16461651 RequestCtx : reqIDBytes ,
16471652 }
@@ -1655,14 +1660,15 @@ func TestRequestCurrentIndex_Success(t *testing.T) {
16551660
16561661func TestRequestCurrentIndex_WrongRequestID (t * testing.T ) {
16571662 s , mockRaft := setupTestRequestCurrentIndex (t )
1663+ r := read .NewRead (s , mockRaft )
16581664
16591665 wg := sync.WaitGroup {}
16601666 wg .Add (1 )
16611667 var index uint64
16621668 var err error
16631669 go func () {
16641670 defer wg .Done ()
1665- index , err = s . read . RequestCurrentIndex (s .leaderChanged . Receive ())
1671+ index , err = r . RequestCurrentIndex (s .LeaderChanged ())
16661672 }()
16671673
16681674 require .Eventually (t , func () bool {
@@ -1672,7 +1678,7 @@ func TestRequestCurrentIndex_WrongRequestID(t *testing.T) {
16721678 wrongReqIDBytes := make ([]byte , 8 )
16731679 binary .BigEndian .PutUint64 (wrongReqIDBytes , 99999 )
16741680
1675- s . r .readStateC <- raft.ReadState {
1681+ mockRaft .readStateC <- raft.ReadState {
16761682 Index : 100 ,
16771683 RequestCtx : wrongReqIDBytes ,
16781684 }
@@ -1685,7 +1691,7 @@ func TestRequestCurrentIndex_WrongRequestID(t *testing.T) {
16851691 reqIDBytes := make ([]byte , 8 )
16861692 binary .BigEndian .PutUint64 (reqIDBytes , reqID )
16871693
1688- s . r .readStateC <- raft.ReadState {
1694+ mockRaft .readStateC <- raft.ReadState {
16891695 Index : 99 ,
16901696 RequestCtx : reqIDBytes ,
16911697 }
@@ -1698,14 +1704,15 @@ func TestRequestCurrentIndex_WrongRequestID(t *testing.T) {
16981704
16991705func TestRequestCurrentIndex_DelayedResponse (t * testing.T ) {
17001706 s , mockRaft := setupTestRequestCurrentIndex (t )
1707+ r := read .NewRead (s , mockRaft )
17011708
17021709 wg := sync.WaitGroup {}
17031710 wg .Add (1 )
17041711 var index uint64
17051712 var err error
17061713 go func () {
17071714 defer wg .Done ()
1708- index , err = s . read . RequestCurrentIndex (s .leaderChanged . Receive ())
1715+ index , err = r . RequestCurrentIndex (s .LeaderChanged ())
17091716 }()
17101717
17111718 require .Eventually (t , func () bool {
@@ -1718,7 +1725,7 @@ func TestRequestCurrentIndex_DelayedResponse(t *testing.T) {
17181725 binary .BigEndian .PutUint64 (reqIDBytes , reqID )
17191726
17201727 select {
1721- case s . r .readStateC <- raft.ReadState {
1728+ case mockRaft .readStateC <- raft.ReadState {
17221729 Index : 100 ,
17231730 RequestCtx : reqIDBytes ,
17241731 }:
@@ -1731,31 +1738,28 @@ func TestRequestCurrentIndex_DelayedResponse(t *testing.T) {
17311738 require .Equal (t , uint64 (100 ), index )
17321739}
17331740
1734- func setupTestRequestCurrentIndex (t * testing.T ) (* EtcdServer , * testRaftNode ) {
1735- mockRaft := & testRaftNode {}
1736- s := & EtcdServer {
1737- lgMu : new (sync.RWMutex ),
1738- lg : zaptest .NewLogger (t ),
1739- reqIDGen : idutil .NewGenerator (0 , time.Time {}),
1740- firstCommitInTerm : notify .NewNotifier (),
1741- leaderChanged : notify .NewNotifier (),
1742- r : raftNode {
1743- raftNodeConfig : raftNodeConfig {
1744- Node : mockRaft ,
1745- },
1746- readStateC : make (chan raft.ReadState , 1 ),
1747- },
1741+ func setupTestRequestCurrentIndex (t * testing.T ) (* mockServer , * testRaftNode ) {
1742+ s := & mockServer {
1743+ leaderChanged : make (chan struct {}, 1 ),
1744+ stopping : make (chan struct {}, 1 ),
1745+ done : make (chan struct {}, 1 ),
1746+ logger : zaptest .NewLogger (t ),
1747+ timeout : 5 * time .Second ,
1748+ firstCommit : make (chan struct {}, 1 ),
1749+ }
1750+ mockRaft := & testRaftNode {
1751+ readStateC : make (chan raft.ReadState , 1 ),
17481752 }
1749- s .read = read .NewRead (s , & s .r )
17501753 return s , mockRaft
17511754}
17521755
17531756type testRaftNode struct {
1754- raft.Node
1757+ readStateC chan raft.ReadState
17551758 mu sync.Mutex
17561759 readIndexRequests []uint64
17571760}
17581761
1762+ func (m * testRaftNode ) ReadState () <- chan raft.ReadState { return m .readStateC }
17591763func (m * testRaftNode ) ReadIndex (ctx context.Context , rctx []byte ) error {
17601764 m .mu .Lock ()
17611765 defer m .mu .Unlock ()
@@ -1772,3 +1776,24 @@ func (m *testRaftNode) getRequests() []uint64 {
17721776 copy (res , m .readIndexRequests )
17731777 return res
17741778}
1779+
1780+ type mockServer struct {
1781+ leaderChanged chan struct {}
1782+ stopping chan struct {}
1783+ done chan struct {}
1784+ logger * zap.Logger
1785+ appliedIndex uint64
1786+ nextRequestID uint64
1787+ timeout time.Duration
1788+ firstCommit chan struct {}
1789+ }
1790+
1791+ func (s * mockServer ) LeaderChanged () <- chan struct {} { return s .leaderChanged }
1792+ func (s * mockServer ) Stopping () <- chan struct {} { return s .stopping }
1793+ func (s * mockServer ) Logger () * zap.Logger { return s .logger }
1794+ func (s * mockServer ) AppliedIndex () uint64 { return s .appliedIndex }
1795+ func (s * mockServer ) ApplyWait (deadline uint64 ) <- chan struct {} { return nil }
1796+ func (s * mockServer ) NextRequestID () uint64 { s .nextRequestID ++ ; return s .nextRequestID }
1797+ func (s * mockServer ) RequestTimeout () time.Duration { return s .timeout }
1798+ func (s * mockServer ) FirstCommitInTermNotify () <- chan struct {} { return s .firstCommit }
1799+ func (s * mockServer ) Done () <- chan struct {} { return s .done }
0 commit comments