Skip to content

Commit 1ee1bb2

Browse files
authored
Merge pull request #302 from chopratejas/rust-stage-3d-compression-metrics
chore(proxy): per-strategy compression observability
2 parents a1f1683 + 2a6ab38 commit 1ee1bb2

6 files changed

Lines changed: 563 additions & 19 deletions

File tree

headroom/proxy/prometheus_metrics.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ def __init__(
8181
self.tokens_output_total = 0
8282
self.tokens_saved_total = 0
8383

84+
# Per-strategy compression counters. Populated lazily as we see
85+
# each strategy tag — no hardcoded list of strategies; the keys
86+
# come from ContentRouter's `CompressionStrategy.value` and
87+
# SmartCrusher's literal `"smart_crusher"`. The forcing
88+
# function for catching strategy-level silent regressions:
89+
# if SmartCrusher events drop to zero in production, the
90+
# `headroom_compressions_total{strategy="smart_crusher"}`
91+
# counter shows it on day 1, not week 3.
92+
self.compressions_by_strategy: dict[str, int] = defaultdict(int)
93+
self.tokens_saved_by_strategy: dict[str, int] = defaultdict(int)
94+
8495
self.latency_sum_ms = 0.0
8596
self.latency_min_ms = float("inf")
8697
self.latency_max_ms = 0.0
@@ -242,6 +253,36 @@ def record_stack(self, stack: str | None) -> None:
242253
return
243254
self.requests_by_stack[slug] += 1
244255

256+
def record_compression(
257+
self,
258+
strategy: str,
259+
original_tokens: int,
260+
compressed_tokens: int,
261+
) -> None:
262+
"""Implements `headroom.transforms.observability.CompressionObserver`.
263+
264+
Called once per real compression event by the configured
265+
transforms (ContentRouter at routing-decision granularity;
266+
SmartCrusher at message granularity in the legacy direct-
267+
pipeline path). Increments the per-strategy counters that
268+
get exported as labelled Prometheus metrics, so silent
269+
regressions in any single strategy become visible in the
270+
scrape.
271+
272+
Synchronous + lock-free: `defaultdict(int)` writes are
273+
atomic under the GIL for these key types; the proxy serves
274+
many requests concurrently and the contention here would be
275+
a single dict write per routing decision.
276+
277+
Tokens saved is `max(0, original - compressed)` — the
278+
observer never records "negative savings" even if a
279+
compressor goofs and emits more tokens than it received.
280+
"""
281+
self.compressions_by_strategy[strategy] += 1
282+
saved = original_tokens - compressed_tokens
283+
if saved > 0:
284+
self.tokens_saved_by_strategy[strategy] += saved
285+
245286
async def record_request(
246287
self,
247288
provider: str,
@@ -523,6 +564,15 @@ async def export(self) -> str:
523564
help_text="Tokens saved by optimization",
524565
value=self.tokens_saved_total,
525566
)
567+
# NOTE: per-strategy compression breakdown is tracked
568+
# internally on `self.compressions_by_strategy` and
569+
# `self.tokens_saved_by_strategy` (populated by
570+
# `record_compression`) but **deliberately not exported
571+
# here**. The proxy's metric→Supabase pipeline treats
572+
# each metric name as a column, and we cannot add new
573+
# columns. The state is still observable for tests +
574+
# programmatic introspection; if/when a non-column-
575+
# adding export path exists, surface it there.
526576
_append_metric(
527577
lines,
528578
name="headroom_latency_ms_sum",

headroom/proxy/server.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,22 @@ def __init__(self, config: ProxyConfig):
239239
self.anthropic_provider = self.provider_runtime.pipeline_provider("anthropic")
240240
self.openai_provider = self.provider_runtime.pipeline_provider("openai")
241241

242+
# `metrics` is hoisted ahead of transform construction so the
243+
# transforms can receive `self.metrics` as their compression
244+
# observer at __init__ time. The forcing function for catching
245+
# silent strategy regressions: per-strategy counters increment
246+
# only when wired up here, so the wiring is mandatory, not
247+
# something we patch in later. (See `RUST_DEV.md` audit notes.)
248+
self.cost_tracker = (
249+
CostTracker(
250+
budget_limit_usd=config.budget_limit_usd,
251+
budget_period=config.budget_period,
252+
)
253+
if config.cost_tracking_enabled
254+
else None
255+
)
256+
self.metrics = PrometheusMetrics(cost_tracker=self.cost_tracker)
257+
242258
# Initialize transforms based on routing mode
243259
# Choose context manager: IntelligentContextManager (smart) or RollingWindow (legacy)
244260
context_manager: Transform # Can be either IntelligentContextManager or RollingWindow
@@ -280,7 +296,7 @@ def __init__(self, config: ProxyConfig):
280296
router_config.protect_recent_reads_fraction = 0.3
281297
transforms = [
282298
CacheAligner(CacheAlignerConfig(enabled=False)),
283-
ContentRouter(router_config),
299+
ContentRouter(router_config, observer=self.metrics),
284300
context_manager,
285301
]
286302
self._code_aware_status = "lazy" if config.code_aware_enabled else "disabled"
@@ -298,6 +314,7 @@ def __init__(self, config: ProxyConfig):
298314
enabled=config.ccr_inject_tool,
299315
inject_retrieval_marker=config.ccr_inject_tool, # Add CCR markers
300316
),
317+
observer=self.metrics,
301318
),
302319
context_manager,
303320
]
@@ -332,16 +349,9 @@ def __init__(self, config: ProxyConfig):
332349
else None
333350
)
334351

335-
self.cost_tracker = (
336-
CostTracker(
337-
budget_limit_usd=config.budget_limit_usd,
338-
budget_period=config.budget_period,
339-
)
340-
if config.cost_tracking_enabled
341-
else None
342-
)
343-
344-
self.metrics = PrometheusMetrics(cost_tracker=self.cost_tracker)
352+
# `cost_tracker` and `metrics` were hoisted to before transforms so
353+
# ContentRouter / SmartCrusher could take `self.metrics` as their
354+
# compression observer at __init__ time.
345355

346356
# Prefix cache tracking: freeze already-cached messages to avoid
347357
# invalidating the provider's prefix cache with our transforms

headroom/transforms/content_router.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -643,13 +643,26 @@ class ContentRouter(Transform):
643643

644644
name: str = "content_router"
645645

646-
def __init__(self, config: ContentRouterConfig | None = None):
646+
def __init__(
647+
self,
648+
config: ContentRouterConfig | None = None,
649+
observer: Any = None,
650+
):
647651
"""Initialize content router.
648652
649653
Args:
650654
config: Router configuration. Uses defaults if None.
655+
observer: Optional `CompressionObserver` (see
656+
`headroom.transforms.observability`) called once per
657+
routing decision after `compress()` finishes. The
658+
proxy's `PrometheusMetrics` is the production
659+
implementation — it increments per-strategy counters
660+
so silent regressions become visible. `None` disables
661+
observation; pick one explicitly per the no-fallback
662+
rule in the audit doc.
651663
"""
652664
self.config = config or ContentRouterConfig()
665+
self._observer = observer
653666

654667
# Lazy-loaded compressors
655668
self._code_compressor: Any = None
@@ -766,20 +779,46 @@ def compress(
766779
RouterCompressionResult with compressed content and routing metadata.
767780
"""
768781
if not content or not content.strip():
769-
return RouterCompressionResult(
782+
result = RouterCompressionResult(
770783
compressed=content,
771784
original=content,
772785
strategy_used=CompressionStrategy.PASSTHROUGH,
773786
routing_log=[],
774787
)
788+
else:
789+
# Determine strategy from content analysis
790+
strategy = self._determine_strategy(content)
775791

776-
# Determine strategy from content analysis
777-
strategy = self._determine_strategy(content)
792+
if strategy == CompressionStrategy.MIXED:
793+
result = self._compress_mixed(content, context, question, bias=bias)
794+
else:
795+
result = self._compress_pure(content, strategy, context, question, bias=bias)
778796

779-
if strategy == CompressionStrategy.MIXED:
780-
return self._compress_mixed(content, context, question, bias=bias)
781-
else:
782-
return self._compress_pure(content, strategy, context, question, bias=bias)
797+
# One observer call per routing decision; the observer is the
798+
# forcing function for catching strategy-level regressions.
799+
# Empty routing_log (passthrough fast path) → no calls.
800+
self._observe(result)
801+
return result
802+
803+
def _observe(self, result: RouterCompressionResult) -> None:
804+
"""Forward each `RoutingDecision` in `result.routing_log` to the
805+
configured `CompressionObserver`. No-op when no observer is set.
806+
807+
Observers MUST NOT raise per the protocol contract; if one does
808+
anyway, swallow at debug level. Compression already succeeded;
809+
a buggy observer must not turn a 200 into a 500.
810+
"""
811+
if self._observer is None:
812+
return
813+
for d in result.routing_log:
814+
try:
815+
self._observer.record_compression(
816+
strategy=d.strategy.value,
817+
original_tokens=d.original_tokens,
818+
compressed_tokens=d.compressed_tokens,
819+
)
820+
except Exception as e: # pragma: no cover - defensive
821+
logger.debug("CompressionObserver raised (non-fatal): %s", e)
783822

784823
def _determine_strategy(self, content: str) -> CompressionStrategy:
785824
"""Determine the compression strategy from content analysis.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""Observability protocol for compression events.
2+
3+
A single `CompressionObserver` interface that any transform can call
4+
after a real compression event. Concrete observers — Prometheus, OTel,
5+
structured logs — implement this; transforms only see the protocol.
6+
7+
The motivating regression: `ContentRouter._record_to_toin` skipped
8+
SmartCrusher on the assumption SmartCrusher recorded its own TOIN
9+
events (it did when SmartCrusher was Python; it stopped when the Rust
10+
port took over). The disconnect was invisible for three weeks because
11+
no metric distinguished compression events by strategy. This module
12+
exists so the next regression of that shape alerts on day 1: if
13+
SmartCrusher events drop to zero in production, the Prometheus
14+
counter shows it immediately.
15+
16+
Design choices, called out for posterity:
17+
18+
- **No fallback observer.** Callers pass `None` or pass a real
19+
observer. There is no "default no-op" instance — that would let a
20+
caller silently disable observability by forgetting to pass one,
21+
and we just spent a PR fixing exactly that class of bug. Be
22+
explicit.
23+
- **No observer registry.** A single observer per transform instance.
24+
If you need multi-fanout, compose at the call site (one wrapper
25+
observer that forwards to N children) — but the trivial pattern
26+
doesn't need a registry baked in.
27+
- **No batching.** Each compression event is one call. Volume is
28+
bounded by the number of routing decisions per request — small.
29+
Batching would only matter if observers had to round-trip to a
30+
remote system; production observers (Prometheus) are in-process
31+
counter increments, which are cheaper than the protocol dispatch.
32+
- **Strategy as a string.** The router and crusher both already
33+
serialize their strategy as the enum's `.value` tag. Passing the
34+
string keeps observers from importing `CompressionStrategy` and
35+
lets non-router callers (e.g. SmartCrusher in legacy mode) emit
36+
the same shape without round-tripping through the enum.
37+
"""
38+
39+
from __future__ import annotations
40+
41+
from typing import Protocol, runtime_checkable
42+
43+
44+
@runtime_checkable
45+
class CompressionObserver(Protocol):
46+
"""Receive one notification per real compression event.
47+
48+
Implementations should be cheap — this lives on the proxy hot path,
49+
one call per routing decision per request. A Prometheus-counter
50+
increment is the right order of magnitude.
51+
52+
Args:
53+
strategy: Lowercase tag identifying the compression strategy
54+
that ran. Matches `CompressionStrategy.<NAME>.value` for
55+
ContentRouter; SmartCrusher's legacy direct-call path
56+
passes the literal `"smart_crusher"`.
57+
original_tokens: Token count of the input the strategy
58+
received.
59+
compressed_tokens: Token count of the output the strategy
60+
produced. Equal to `original_tokens` for passthrough;
61+
less when compression saved tokens.
62+
63+
Implementations MUST NOT raise. If the observer needs to fail-
64+
over (Prometheus client misconfigured, OTel exporter offline)
65+
handle that internally — bubbling exceptions out of an observer
66+
would break the compression that just succeeded, which is the
67+
opposite of what observability should do. (See the audit
68+
in `RUST_DEV.md`: any silent regression is bad, but a noisy
69+
observer that breaks compression is worse.)
70+
"""
71+
72+
def record_compression(
73+
self,
74+
strategy: str,
75+
original_tokens: int,
76+
compressed_tokens: int,
77+
) -> None: ...

headroom/transforms/smart_crusher.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def __init__(
152152
scorer: Any = None,
153153
ccr_config: CCRConfig | None = None,
154154
with_compaction: bool = True,
155+
observer: Any = None,
155156
):
156157
# Hard import — no Python fallback. If the wheel is missing the
157158
# caller must build it (scripts/build_rust_extension.sh) or
@@ -167,6 +168,12 @@ def __init__(
167168
cfg = config or SmartCrusherConfig()
168169
self.config = cfg
169170
self._with_compaction = with_compaction
171+
# `observer`: see `headroom.transforms.observability`. The
172+
# legacy proxy pipeline uses SmartCrusher.apply() directly
173+
# (no ContentRouter); without an observer here, those
174+
# compressions would be invisible to per-strategy metrics —
175+
# exactly the silent-regression class we're guarding against.
176+
self._observer = observer
170177

171178
# CCR config is preserved on `self` for callers that read it
172179
# back (`headroom.proxy.server` does). Storage-side semantics
@@ -473,6 +480,24 @@ def _extract_context_from_messages(self, messages: list[dict[str, Any]]) -> str:
473480

474481
return " ".join(context_parts)
475482

483+
def _notify_observer(self, original_tokens: int, compressed_tokens: int) -> None:
484+
"""Forward a compression event to the configured
485+
`CompressionObserver` (see `headroom.transforms.observability`).
486+
No-op when no observer is set; swallows observer exceptions at
487+
debug level so a buggy metrics impl doesn't break the
488+
compression that just succeeded.
489+
"""
490+
if self._observer is None:
491+
return
492+
try:
493+
self._observer.record_compression(
494+
strategy="smart_crusher",
495+
original_tokens=original_tokens,
496+
compressed_tokens=compressed_tokens,
497+
)
498+
except Exception as e: # pragma: no cover - defensive
499+
logger.debug("CompressionObserver raised (non-fatal): %s", e)
500+
476501
def apply(
477502
self,
478503
messages: list[dict[str, Any]],
@@ -516,6 +541,7 @@ def apply(
516541
markers_inserted.append(marker)
517542
if info:
518543
transforms_applied.append(f"smart:{info}")
544+
self._notify_observer(tokens, tokenizer.count_text(crushed))
519545

520546
# Anthropic-style: content is a list of blocks; each tool_result
521547
# block has a string content field of its own.
@@ -541,6 +567,7 @@ def apply(
541567
markers_inserted.append(marker)
542568
if info:
543569
transforms_applied.append(f"smart:{info}")
570+
self._notify_observer(tokens, tokenizer.count_text(crushed))
544571

545572
if crushed_count > 0:
546573
transforms_applied.insert(0, f"smart_crush:{crushed_count}")

0 commit comments

Comments
 (0)