Skip to content

Commit 52f2ff1

Browse files
authored
Merge branch 'main' into mysql-fix-log-resource-attributes
2 parents 6660ba5 + c9d7b22 commit 52f2ff1

17 files changed

Lines changed: 163 additions & 47 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
change_type: enhancement
2+
component: processor/k8s_attributes
3+
note: Add `watch_sync_period` config option to configure informer cache resync period.
4+
issues: [48111]
5+
subtext: The `watch_sync_period` config option defaults to `5m` to match the previously hardcoded behavior.
6+
change_logs: [user]

processor/k8sattributesprocessor/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ wait_for_metadata: true
226226
wait_for_metadata_timeout: 10s
227227
```
228228

229+
## Informer Cache Resync Period
230+
231+
Reprocessing the informer cache periodically (resyncing) enqueues all cached K8s objects back into event handlers. In large clusters (e.g., 100K pods), this causes significant CPU spikes, memory churn, and garbage collection overhead.
232+
Because resource state modifications are already pushed immediately via Kubernetes watch events, a resync period is almost entirely unnecessary.
233+
234+
- `watch_sync_period` (`default: 5m`): The resync period for K8s informers. You may set this to `0s` to disable resyncing completely (recommended for large clusters).
235+
236+
229237
## Extracting attributes from pod labels and annotations
230238

231239
The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces, deployments, statefulsets, daemonsets, jobs and nodes.
@@ -700,6 +708,10 @@ k8s_attributes:
700708
# Default: 10s
701709
wait_for_metadata_timeout: 10s
702710
711+
# Informer resync period. Setting this to 0s disables resyncing completely.
712+
# Default: 5m
713+
watch_sync_period: 5m
714+
703715
# Extract configuration - defines what metadata to extract
704716
extract:
705717
# Metadata fields to extract as resource attributes
@@ -870,6 +882,7 @@ k8s_attributes:
870882
| `passthrough` | bool | `false` | Only add pod IP without extracting metadata (no K8s API calls) |
871883
| `wait_for_metadata` | bool | `false` | Block collector startup until metadata is synced |
872884
| `wait_for_metadata_timeout` | duration | `10s` | Max wait time for metadata sync on startup |
885+
| `watch_sync_period` | duration | `5m` | Resync period for K8s informers (`0s` disables resync completely) |
873886

874887
#### Extract Options
875888

processor/k8sattributesprocessor/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func selectors() (labels.Selector, fields.Selector) {
4545
}
4646

4747
// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
48-
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformersFactoryList, _ bool, _ time.Duration) (kube.Client, error) {
48+
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformersFactoryList, _ bool, _, _ time.Duration) (kube.Client, error) {
4949
cs := fake.NewClientset()
5050

5151
ls, fs := selectors()

processor/k8sattributesprocessor/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor"
55

66
import (
7+
"errors"
78
"fmt"
89
"os"
910
"regexp"
@@ -46,13 +47,22 @@ type Config struct {
4647

4748
// WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced.
4849
WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"`
50+
51+
// WatchSyncPeriod determines the resync period for K8s informers.
52+
// Reprocessing the informer cache periodically can cause significant memory churn and CPU spikes.
53+
// Setting this to 0 disables resync.
54+
WatchSyncPeriod time.Duration `mapstructure:"watch_sync_period"`
4955
}
5056

5157
func (cfg *Config) Validate() error {
5258
if err := cfg.APIConfig.Validate(); err != nil {
5359
return err
5460
}
5561

62+
if cfg.WatchSyncPeriod < 0 {
63+
return errors.New("watch_sync_period must be greater than or equal to 0")
64+
}
65+
5666
for _, assoc := range cfg.Association {
5767
if len(assoc.Sources) > kube.PodIdentifierMaxLength {
5868
return fmt.Errorf("too many association sources. limit is %v", kube.PodIdentifierMaxLength)

processor/k8sattributesprocessor/config.schema.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,5 +135,9 @@ properties:
135135
description: WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced.
136136
type: string
137137
format: duration
138+
watch_sync_period:
139+
description: WatchSyncPeriod determines the resync period for K8s informers. Reprocessing the informer cache periodically can cause significant memory churn and CPU spikes. Setting this to 0 disables resync.
140+
type: string
141+
format: duration
138142
allOf:
139143
- $ref: /internal/k8sconfig.api_config

processor/k8sattributesprocessor/config_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func TestLoadConfig(t *testing.T) {
3434
DeploymentNameFromReplicaSet: false,
3535
},
3636
WaitForMetadataTimeout: 10 * time.Second,
37+
WatchSyncPeriod: 5 * time.Minute,
3738
},
3839
},
3940
{
@@ -107,6 +108,7 @@ func TestLoadConfig(t *testing.T) {
107108
},
108109
},
109110
WaitForMetadataTimeout: 10 * time.Second,
111+
WatchSyncPeriod: 5 * time.Minute,
110112
},
111113
},
112114
{
@@ -131,6 +133,7 @@ func TestLoadConfig(t *testing.T) {
131133
},
132134
},
133135
WaitForMetadataTimeout: 10 * time.Second,
136+
WatchSyncPeriod: 5 * time.Minute,
134137
},
135138
},
136139
{
@@ -143,6 +146,7 @@ func TestLoadConfig(t *testing.T) {
143146
},
144147
Exclude: defaultExcludes,
145148
WaitForMetadataTimeout: 10 * time.Second,
149+
WatchSyncPeriod: 5 * time.Minute,
146150
},
147151
},
148152
{
@@ -183,6 +187,7 @@ func TestLoadConfig(t *testing.T) {
183187
},
184188
Exclude: defaultExcludes,
185189
WaitForMetadataTimeout: 10 * time.Second,
190+
WatchSyncPeriod: 5 * time.Minute,
186191
},
187192
},
188193
{
@@ -196,6 +201,7 @@ func TestLoadConfig(t *testing.T) {
196201
Exclude: defaultExcludes,
197202
WaitForMetadata: true,
198203
WaitForMetadataTimeout: 30 * time.Second,
204+
WatchSyncPeriod: 5 * time.Minute,
199205
},
200206
},
201207
{
@@ -209,6 +215,7 @@ func TestLoadConfig(t *testing.T) {
209215
},
210216
Exclude: defaultExcludes,
211217
WaitForMetadataTimeout: 10 * time.Second,
218+
WatchSyncPeriod: 5 * time.Minute,
212219
},
213220
},
214221
{
@@ -226,6 +233,7 @@ func TestLoadConfig(t *testing.T) {
226233
},
227234
Exclude: defaultExcludes,
228235
WaitForMetadataTimeout: 10 * time.Second,
236+
WatchSyncPeriod: 5 * time.Minute,
229237
},
230238
},
231239
{
@@ -243,6 +251,7 @@ func TestLoadConfig(t *testing.T) {
243251
},
244252
Exclude: defaultExcludes,
245253
WaitForMetadataTimeout: 10 * time.Second,
254+
WatchSyncPeriod: 5 * time.Minute,
246255
},
247256
},
248257
{
@@ -261,6 +270,7 @@ func TestLoadConfig(t *testing.T) {
261270
},
262271
Exclude: defaultExcludes,
263272
WaitForMetadataTimeout: 10 * time.Second,
273+
WatchSyncPeriod: 5 * time.Minute,
264274
},
265275
},
266276
{
@@ -276,6 +286,7 @@ func TestLoadConfig(t *testing.T) {
276286
},
277287
Exclude: defaultExcludes,
278288
WaitForMetadataTimeout: 10 * time.Second,
289+
WatchSyncPeriod: 5 * time.Minute,
279290
},
280291
},
281292
{
@@ -291,6 +302,7 @@ func TestLoadConfig(t *testing.T) {
291302
},
292303
Exclude: defaultExcludes,
293304
WaitForMetadataTimeout: 10 * time.Second,
305+
WatchSyncPeriod: 5 * time.Minute,
294306
},
295307
},
296308
{
@@ -306,6 +318,7 @@ func TestLoadConfig(t *testing.T) {
306318
},
307319
Exclude: defaultExcludes,
308320
WaitForMetadataTimeout: 10 * time.Second,
321+
WatchSyncPeriod: 5 * time.Minute,
309322
},
310323
},
311324
{
@@ -321,6 +334,7 @@ func TestLoadConfig(t *testing.T) {
321334
},
322335
Exclude: defaultExcludes,
323336
WaitForMetadataTimeout: 10 * time.Second,
337+
WatchSyncPeriod: 5 * time.Minute,
324338
},
325339
},
326340
{
@@ -336,6 +350,7 @@ func TestLoadConfig(t *testing.T) {
336350
},
337351
Exclude: defaultExcludes,
338352
WaitForMetadataTimeout: 10 * time.Second,
353+
WatchSyncPeriod: 5 * time.Minute,
339354
},
340355
},
341356
{
@@ -357,6 +372,7 @@ func TestLoadConfig(t *testing.T) {
357372
},
358373
Exclude: defaultExcludes,
359374
WaitForMetadataTimeout: 10 * time.Second,
375+
WatchSyncPeriod: 5 * time.Minute,
360376
},
361377
},
362378
{
@@ -370,11 +386,39 @@ func TestLoadConfig(t *testing.T) {
370386
},
371387
Exclude: defaultExcludes,
372388
WaitForMetadataTimeout: 10 * time.Second,
389+
WatchSyncPeriod: 5 * time.Minute,
373390
},
374391
},
375392
{
376393
id: component.NewIDWithName(metadata.Type, "bad_metadata_field"),
377394
},
395+
{
396+
id: component.NewIDWithName(metadata.Type, "custom_intervals"),
397+
expected: &Config{
398+
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
399+
Extract: ExtractConfig{
400+
Metadata: enabledAttributes(),
401+
},
402+
Exclude: defaultExcludes,
403+
WaitForMetadataTimeout: 10 * time.Second,
404+
WatchSyncPeriod: 20 * time.Second,
405+
},
406+
},
407+
{
408+
id: component.NewIDWithName(metadata.Type, "bad_watch_sync_period"),
409+
},
410+
{
411+
id: component.NewIDWithName(metadata.Type, "disable_watch_sync"),
412+
expected: &Config{
413+
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
414+
Extract: ExtractConfig{
415+
Metadata: enabledAttributes(),
416+
},
417+
Exclude: defaultExcludes,
418+
WaitForMetadataTimeout: 10 * time.Second,
419+
WatchSyncPeriod: 0,
420+
},
421+
},
378422
}
379423

380424
for _, tt := range tests {
@@ -396,6 +440,7 @@ func TestLoadConfig(t *testing.T) {
396440
assert.Error(t, err)
397441
return
398442
}
443+
399444
assert.NoError(t, xconfmap.Validate(cfg))
400445
assert.Equal(t, tt.expected, cfg)
401446
})

processor/k8sattributesprocessor/factory.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func createDefaultConfig() component.Config {
5050
Metadata: enabledAttributes(),
5151
},
5252
WaitForMetadataTimeout: 10 * time.Second,
53+
WatchSyncPeriod: 5 * time.Minute,
5354
}
5455
}
5556

@@ -278,7 +279,8 @@ func createProcessorOpts(cfg component.Config) []option {
278279
withAPIConfig(oCfg.APIConfig),
279280
withExtractPodAssociations(oCfg.Association...),
280281
withExcludes(oCfg.Exclude),
281-
withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout))
282+
withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout),
283+
withWatchSyncPeriod(oCfg.WatchSyncPeriod))
282284

283285
if oCfg.WaitForMetadata {
284286
opts = append(opts, withWaitForMetadata(true))

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type WatchClient struct {
5252
stopCh chan struct{}
5353
waitForMetadata bool
5454
waitForMetadataTimeout time.Duration
55+
watchSyncPeriod time.Duration
5556

5657
// A map containing Pod related data, used to associate them with resources.
5758
// Key can be either an IP address or Pod UID
@@ -123,6 +124,7 @@ func New(
123124
informersFactory InformersFactoryList,
124125
waitForMetadata bool,
125126
waitForMetadataTimeout time.Duration,
127+
watchSyncPeriod time.Duration,
126128
) (Client, error) {
127129
telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
128130
if err != nil {
@@ -140,6 +142,7 @@ func New(
140142
telemetryBuilder: telemetryBuilder,
141143
waitForMetadata: waitForMetadata,
142144
waitForMetadataTimeout: waitForMetadataTimeout,
145+
watchSyncPeriod: watchSyncPeriod,
143146
}
144147

145148
c.Pods = map[PodIdentifier]*Pod{}
@@ -171,19 +174,25 @@ func New(
171174
zap.String("fieldSelector", fieldSelector.String()),
172175
)
173176
if informersFactory.newInformer == nil {
174-
informersFactory.newInformer = newSharedInformer
177+
informersFactory.newInformer = func(client kubernetes.Interface, ns string, ls labels.Selector, fs fields.Selector) cache.SharedInformer {
178+
return newSharedInformer(client, ns, ls, fs, watchSyncPeriod)
179+
}
175180
}
176181

177182
if informersFactory.newNamespaceInformer == nil {
178183
switch {
179184
case c.extractNamespaceLabelsAnnotations():
180185
// if rules to extract metadata from namespace is configured use namespace shared informer containing
181186
// all namespaces including kube-system which contains cluster uid information (kube-system-uid)
182-
informersFactory.newNamespaceInformer = newNamespaceSharedInformer
187+
informersFactory.newNamespaceInformer = func(client clientmeta.Interface) cache.SharedInformer {
188+
return newNamespaceSharedInformer(client, watchSyncPeriod)
189+
}
183190
case rules.ClusterUID:
184191
// use kube-system shared informer to only watch kube-system namespace
185192
// reducing overhead of watching all the namespaces
186-
informersFactory.newNamespaceInformer = newKubeSystemSharedInformer
193+
informersFactory.newNamespaceInformer = func(client clientmeta.Interface) cache.SharedInformer {
194+
return newKubeSystemSharedInformer(client, watchSyncPeriod)
195+
}
187196
default:
188197
informersFactory.newNamespaceInformer = NewNoOpInformer
189198
}
@@ -217,7 +226,9 @@ func New(
217226

218227
if needReplicaSetInformer {
219228
if informersFactory.newReplicaSetInformer == nil {
220-
informersFactory.newReplicaSetInformer = newReplicaSetSharedInformer
229+
informersFactory.newReplicaSetInformer = func(client clientmeta.Interface, namespace string) cache.SharedInformer {
230+
return newReplicaSetSharedInformer(client, namespace, watchSyncPeriod)
231+
}
221232
}
222233

223234
c.replicasetInformer = informersFactory.newReplicaSetInformer(c.mc, c.Filters.Namespace)
@@ -227,23 +238,23 @@ func New(
227238
}
228239

229240
if c.extractNodeLabelsAnnotations() || c.extractNodeUID() {
230-
c.nodeInformer = newNodeSharedInformer(c.mc, c.Filters.Node)
241+
c.nodeInformer = newNodeSharedInformer(c.mc, c.Filters.Node, watchSyncPeriod)
231242
}
232243

233244
if c.extractDeploymentLabelsAnnotations() {
234-
c.deploymentInformer = newDeploymentSharedInformer(c.mc, c.Filters.Namespace)
245+
c.deploymentInformer = newDeploymentSharedInformer(c.mc, c.Filters.Namespace, watchSyncPeriod)
235246
}
236247

237248
if c.extractStatefulSetLabelsAnnotations() {
238-
c.statefulsetInformer = newStatefulSetSharedInformer(c.mc, c.Filters.Namespace)
249+
c.statefulsetInformer = newStatefulSetSharedInformer(c.mc, c.Filters.Namespace, watchSyncPeriod)
239250
}
240251

241252
if c.extractDaemonSetLabelsAnnotations() {
242-
c.daemonsetInformer = newDaemonSetSharedInformer(c.mc, c.Filters.Namespace)
253+
c.daemonsetInformer = newDaemonSetSharedInformer(c.mc, c.Filters.Namespace, watchSyncPeriod)
243254
}
244255

245256
if c.extractJobLabelsAnnotations() || rules.CronJobUID {
246-
c.jobInformer = newJobSharedInformer(c.mc, c.Filters.Namespace)
257+
c.jobInformer = newJobSharedInformer(c.mc, c.Filters.Namespace, watchSyncPeriod)
247258
}
248259
return c, err
249260
}

0 commit comments

Comments
 (0)