From 35abc5d24c9c111ea784927f45b55f1298fc7572 Mon Sep 17 00:00:00 2001 From: Prathamesh Bhope Date: Fri, 10 Apr 2026 13:09:34 -0700 Subject: [PATCH] fix: stop goroutine and memory leak in CR reflectors on repeated CRD discovery Co-authored-by: Oleg Zaytsev <1511481+colega@users.noreply.github.com> --- internal/discovery/memleak_test.go | 63 ++++++++++++++++++++ internal/discovery/types.go | 24 +++++++- internal/discovery/types_test.go | 96 ++++++++++++++++++++++++++++++ internal/store/builder.go | 17 +++++- internal/store/builder_test.go | 55 +++++++++++++++++ pkg/app/server.go | 2 +- 6 files changed, 252 insertions(+), 5 deletions(-) create mode 100644 internal/discovery/memleak_test.go create mode 100644 internal/discovery/types_test.go diff --git a/internal/discovery/memleak_test.go b/internal/discovery/memleak_test.go new file mode 100644 index 0000000000..2eedf0cd14 --- /dev/null +++ b/internal/discovery/memleak_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2026 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "fmt" + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func makeGVKPs(n int) []groupVersionKindPlural { + gvkps := make([]groupVersionKindPlural, n) + for i := range n { + gvkps[i] = groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: fmt.Sprintf("group%d.example.com", i), + Version: "v1", + Kind: fmt.Sprintf("Kind%d", i), + }, + Plural: fmt.Sprintf("kind%ds", i), + } + } + return gvkps +} + +func TestAppendToMapStability(t *testing.T) { + const ( + numGVKs = 5 + pollCycles = 500 + ) + + gvkps := makeGVKPs(numGVKs) + d := &CRDiscoverer{} + + for range pollCycles { + d.AppendToMap(gvkps...) + } + + kindCount := 0 + for _, versions := range d.Map { + for _, kinds := range versions { + kindCount += len(kinds) + } + } + if kindCount != numGVKs { + t.Errorf("expected exactly %d kind entries after %d poll cycles, got %d", numGVKs, pollCycles, kindCount) + } + if got := len(d.GVKToReflectorStopChanMap); got != numGVKs { + t.Errorf("expected exactly %d stop channels after %d poll cycles, got %d", numGVKs, pollCycles, got) + } +} diff --git a/internal/discovery/types.go b/internal/discovery/types.go index b97e442e68..b38ebbffc7 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -84,11 +84,31 @@ func (r *CRDiscoverer) AppendToMap(gvkps ...groupVersionKindPlural) { if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok { r.Map[gvkp.Group][gvkp.Version] = []kindPlural{} } - r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version], kindPlural{Kind: gvkp.Kind, Plural: gvkp.Plural}) - r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] = make(chan struct{}) + alreadyExists := false + for _, existing := range r.Map[gvkp.Group][gvkp.Version] { + if existing.Kind == gvkp.Kind { + alreadyExists = true + break + } + } + if !alreadyExists { + r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version], kindPlural{Kind: gvkp.Kind, Plural: gvkp.Plural}) + } + if _, exists := r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()]; !exists { + r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] = make(chan struct{}) + } } } +// GetStopChanForGVK returns the stop channel for the given GVK under the read lock. +func (r *CRDiscoverer) GetStopChanForGVK(gvk string) chan struct{} { + var ch chan struct{} + r.SafeRead(func() { + ch = r.GVKToReflectorStopChanMap[gvk] + }) + return ch +} + // RemoveFromMap removes the given GVKs from the cache. func (r *CRDiscoverer) RemoveFromMap(gvkps ...groupVersionKindPlural) { for _, gvkp := range gvkps { diff --git a/internal/discovery/types_test.go b/internal/discovery/types_test.go new file mode 100644 index 0000000000..8c900c17bd --- /dev/null +++ b/internal/discovery/types_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2026 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// TestAppendToMapIdempotency verifies that calling AppendToMap repeatedly with +// the same GVK does not accumulate duplicate kind entries or replace existing +// stop channels. +func TestAppendToMapIdempotency(t *testing.T) { + const iterations = 10 + + gvkp := groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: "example.com", + Version: "v1", + Kind: "Foo", + }, + Plural: "foos", + } + + r := &CRDiscoverer{} + + r.AppendToMap(gvkp) + firstCh := r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] + if firstCh == nil { + t.Fatal("expected stop channel to be created on first AppendToMap call") + } + + for i := 1; i < iterations; i++ { + r.AppendToMap(gvkp) + } + + kinds := r.Map[gvkp.Group][gvkp.Version] + if len(kinds) != 1 { + t.Errorf("expected exactly 1 kind entry, got %d", len(kinds)) + } + + gotCh := r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] + if gotCh != firstCh { + t.Error("stop channel was replaced on repeated AppendToMap calls") + } +} + +// TestRemoveFromMapClosesChannel verifies that RemoveFromMap closes and removes +// the stop channel for the deleted GVK. +func TestRemoveFromMapClosesChannel(t *testing.T) { + gvkp := groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: "example.com", + Version: "v1", + Kind: "Bar", + }, + Plural: "bars", + } + + r := &CRDiscoverer{} + r.AppendToMap(gvkp) + + ch := r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] + if ch == nil { + t.Fatal("expected stop channel after AppendToMap") + } + + r.RemoveFromMap(gvkp) + + // Channel must be closed (readable immediately with zero value). + select { + case _, open := <-ch: + if open { + t.Error("channel should be closed, but received a value") + } + default: + t.Error("channel should be closed but is still blocking") + } + + // Entry must be removed from the stop channel map. + if _, exists := r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()]; exists { + t.Error("stop channel map entry should be deleted after RemoveFromMap") + } +} diff --git a/internal/store/builder.go b/internal/store/builder.go index 582c45c8dc..04df9e400a 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -86,7 +86,7 @@ type Builder struct { useAPIServerCache bool objectLimit int64 - GVKToReflectorStopChanMap *map[string]chan struct{} + GetGVKStopChan func(gvk string) chan struct{} } // NewBuilder returns a new builder. @@ -609,6 +609,18 @@ func (b *Builder) buildCustomResourceStores(resourceName string, return stores } +func newCRReflectorStopCh(ctx context.Context, gvkStopCh chan struct{}) chan struct{} { + stopCh := make(chan struct{}) + go func() { + defer close(stopCh) + select { + case <-gvkStopCh: + case <-ctx.Done(): + } + }() + return stopCh +} + // startReflector starts a Kubernetes client-go reflector with the given // listWatcher and registers it with the given store. func (b *Builder) startReflector( @@ -622,7 +634,8 @@ func (b *Builder) startReflector( instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit, client) reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) if cr, ok := expectedType.(*unstructured.Unstructured); ok { - go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()]) + gvkStopCh := b.GetGVKStopChan(cr.GroupVersionKind().String()) + go reflector.Run(newCRReflectorStopCh(b.ctx, gvkStopCh)) } else { go reflector.Run(b.ctx.Done()) } diff --git a/internal/store/builder_test.go b/internal/store/builder_test.go index d9ea5813e8..0052d102b0 100644 --- a/internal/store/builder_test.go +++ b/internal/store/builder_test.go @@ -17,9 +17,11 @@ limitations under the License. package store import ( + "context" "reflect" "slices" "testing" + "time" "k8s.io/kube-state-metrics/v2/pkg/options" ) @@ -258,3 +260,56 @@ func TestWithEnabledResources(t *testing.T) { } } } + +func TestCRReflectorStopChanRespondsToContextCancel(t *testing.T) { + const n = 20 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stopChs := make([]chan struct{}, n) + for i := range n { + gvkStopCh := make(chan struct{}) + stopChs[i] = newCRReflectorStopCh(ctx, gvkStopCh) + } + + for i, ch := range stopChs { + select { + case <-ch: + t.Errorf("goroutine %d stopped prematurely before context cancel", i) + default: + } + } + + cancel() + + for i, ch := range stopChs { + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Errorf("goroutine %d did not stop after context cancellation", i) + return + } + } +} + +func TestCRReflectorStopChanRespondsToGVKStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gvkStopCh := make(chan struct{}) + stopCh := newCRReflectorStopCh(ctx, gvkStopCh) + + select { + case <-stopCh: + t.Fatal("stopCh closed before gvkStopCh was signalled") + default: + } + + close(gvkStopCh) + + select { + case <-stopCh: + case <-time.After(2 * time.Second): + t.Fatal("stopCh did not close after gvkStopCh was closed") + } +} diff --git a/pkg/app/server.go b/pkg/app/server.go index 30d784153e..d34d7a4958 100644 --- a/pkg/app/server.go +++ b/pkg/app/server.go @@ -325,7 +325,7 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { CRDsCacheCountGauge: crdsCacheCountGauge, } // storeBuilder starts reflectors for the discovered GVKs, and as such, should close them too. - storeBuilder.GVKToReflectorStopChanMap = &discovererInstance.GVKToReflectorStopChanMap + storeBuilder.GetGVKStopChan = discovererInstance.GetStopChanForGVK // This starts a goroutine that will watch for any new GVKs to extract from CRDs. err = discovererInstance.StartDiscovery(ctx, kubeConfig) if err != nil {