Skip to content

Commit 3d012fd

Browse files
authored
Merge branch 'main' into fix/azure-openai-responses-structured-predict-engine
2 parents b2799c3 + db30f53 commit 3d012fd

3 files changed

Lines changed: 163 additions & 29 deletions

File tree

llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import inspect
22
import logging
3+
from contextvars import Token
34
from datetime import datetime
45
from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union, cast
56

@@ -52,6 +53,9 @@ class OTelCompatibleSpanHandler(SimpleSpanHandler):
5253
_events_by_span: Dict[str, List[OTelEventAttributes]] = PrivateAttr(
5354
default_factory=dict,
5455
)
56+
_context_tokens: Dict[str, Token[context.Context]] = PrivateAttr(
57+
default_factory=dict
58+
)
5559
all_spans: Dict[str, trace.Span] = Field(
5660
default_factory=dict, description="All the registered OpenTelemetry spans."
5761
)
@@ -79,6 +83,7 @@ def __init__(
7983
self._tracer = tracer
8084
self._tracer_provider = tracer_provider
8185
self._events_by_span = {}
86+
self._context_tokens = {}
8287
self.debug = debug
8388

8489
def close(self) -> None:
@@ -117,6 +122,32 @@ def restore_propagation_context(self, ctx: Dict[str, Any]) -> None:
117122
restored = propagate.extract(carrier)
118123
context.attach(restored)
119124

125+
def _get_parent_context(self, parent_span_id: Optional[str]) -> context.Context:
126+
ctx = context.get_current()
127+
current_span_context = trace.get_current_span(ctx).get_span_context()
128+
if (
129+
current_span_context.is_valid
130+
or parent_span_id is None
131+
or parent_span_id not in self.all_spans
132+
):
133+
return ctx
134+
return set_span_in_context(span=self.all_spans[parent_span_id])
135+
136+
def _add_events_to_span(self, id_: str, span: trace.Span) -> None:
137+
events = self._events_by_span.pop(id_, [])
138+
for event in events:
139+
span.add_event(name=event.name, attributes=event.attributes)
140+
141+
def _detach_context_and_end_span(self, id_: str, span: trace.Span) -> None:
142+
token = self._context_tokens.pop(id_, None)
143+
try:
144+
if token is not None:
145+
context.detach(token)
146+
except BaseException:
147+
_logger.warning("Error detaching OTel context for %s", id_, exc_info=True)
148+
finally:
149+
span.end()
150+
120151
def new_span(
121152
self,
122153
id_: str,
@@ -137,14 +168,14 @@ def new_span(
137168
# 1. Same-process parent found in all_spans → use it
138169
# 2. Otherwise → use ambient OTel context (set by restore_propagation_context
139170
# for cross-process, or inherited naturally for same-process roots)
140-
if parent_span_id is not None and parent_span_id in self.all_spans:
141-
ctx = set_span_in_context(span=self.all_spans[parent_span_id])
142-
else:
143-
ctx = context.get_current()
171+
ctx = self._get_parent_context(parent_span_id)
144172

145173
otel_span = self._tracer.start_span(name=span_name, context=ctx)
146174
self.all_spans.update({id_: otel_span})
147175

176+
token = context.attach(set_span_in_context(otel_span))
177+
self._context_tokens[id_] = token
178+
148179
# Record instrument_tags as span attributes
149180
if tags is not None:
150181
for key, value in tags.items():
@@ -180,13 +211,9 @@ def prepare_to_exit_span(
180211
_logger.warning("No OTel span found for %s in prepare_to_exit_span", id_)
181212
return sp
182213

183-
# Get and process events specific to this span
184-
events = self._events_by_span.pop(id_, [])
185-
for event in events:
186-
span.add_event(name=event.name, attributes=event.attributes)
187-
214+
self._add_events_to_span(id_, span)
188215
span.set_status(status=trace.StatusCode.OK)
189-
span.end()
216+
self._detach_context_and_end_span(id_, span)
190217
return sp
191218

192219
def prepare_to_drop_span(
@@ -209,15 +236,11 @@ def prepare_to_drop_span(
209236
_logger.warning("No OTel span found for %s in prepare_to_drop_span", id_)
210237
return sp
211238

212-
# Get and process events specific to this span
213-
events = self._events_by_span.pop(id_, [])
214-
for event in events:
215-
span.add_event(name=event.name, attributes=event.attributes)
216-
239+
self._add_events_to_span(id_, span)
217240
if err is not None:
218241
span.record_exception(err)
219242
span.set_status(status=trace.StatusCode.ERROR, description=err.__str__())
220-
span.end()
243+
self._detach_context_and_end_span(id_, span)
221244
return sp
222245

223246

@@ -324,6 +347,7 @@ def _start_otel(
324347
assert self.span_exporter is not None, (
325348
"span_exporter has to be non-null to be used within simple or batch span processors"
326349
)
350+
span_processor: SpanProcessor
327351
if self.span_processor == "simple":
328352
span_processor = SimpleSpanProcessor(self.span_exporter)
329353
else:

llama-index-integrations/observability/llama-index-observability-otel/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ dev = [
2626

2727
[project]
2828
name = "llama-index-observability-otel"
29-
version = "0.6.0"
29+
version = "0.6.1"
3030
description = "llama-index observability integration with OpenTelemetry"
3131
authors = [{name = "Clelia Astra Bertelli", email = "clelia@runllama.ai"}]
3232
requires-python = ">=3.10,<4.0"

llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py

Lines changed: 122 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,44 @@ def make_handler():
265265
return exporter, handler, provider
266266

267267

268+
def enter_llama_span(
269+
handler: OTelCompatibleSpanHandler,
270+
span_id: str,
271+
parent_id: Optional[str] = None,
272+
) -> None:
273+
handler.span_enter(id_=span_id, bound_args=_bound, parent_id=parent_id)
274+
275+
276+
def exit_llama_span(handler: OTelCompatibleSpanHandler, span_id: str) -> None:
277+
handler.span_exit(id_=span_id, bound_args=_bound)
278+
279+
280+
def finished_spans_by_name(
281+
exporter: InMemorySpanExporter,
282+
provider: TracerProvider,
283+
) -> dict[str, ReadableSpan]:
284+
provider.force_flush()
285+
spans = exporter.get_finished_spans()
286+
spans_by_name = {span.name: span for span in spans}
287+
assert len(spans_by_name) == len(spans)
288+
return spans_by_name
289+
290+
291+
def assert_parent(child: ReadableSpan, parent: ReadableSpan) -> None:
292+
assert child.parent is not None
293+
assert child.parent.span_id == parent.context.span_id
294+
assert child.parent.trace_id == parent.context.trace_id
295+
296+
297+
def assert_trace_chain(
298+
spans_by_name: dict[str, ReadableSpan],
299+
expected_chain: Sequence[str],
300+
) -> None:
301+
assert set(spans_by_name) == set(expected_chain)
302+
for parent_name, child_name in zip(expected_chain, expected_chain[1:]):
303+
assert_parent(spans_by_name[child_name], spans_by_name[parent_name])
304+
305+
268306
def test_span_name_strips_uuid() -> None:
269307
exporter, handler, provider = make_handler()
270308
handler.span_enter(id_="MyWorkflow.run-abc123-def", bound_args=_bound)
@@ -357,17 +395,95 @@ def test_tags_not_mutated_by_new_span() -> None:
357395
assert tags == original_tags
358396

359397

398+
def test_span_enter_makes_otel_span_current_for_downstream_spans() -> None:
399+
# Expected trace:
400+
# root
401+
# downstream-child
402+
exporter, handler, provider = make_handler()
403+
clean_token = context.attach(context.Context())
404+
try:
405+
downstream_tracer = provider.get_tracer("downstream")
406+
407+
enter_llama_span(handler, "root-uuid")
408+
root_otel_span = handler.all_spans["root-uuid"]
409+
assert trace.get_current_span() is root_otel_span
410+
411+
with downstream_tracer.start_as_current_span("downstream-child"):
412+
pass
413+
414+
exit_llama_span(handler, "root-uuid")
415+
416+
spans = finished_spans_by_name(exporter, provider)
417+
assert_trace_chain(spans, ["root", "downstream-child"])
418+
assert trace.get_current_span() is not root_otel_span
419+
finally:
420+
context.detach(clean_token)
421+
422+
423+
def test_otel_context_is_source_of_truth_when_external_spans_interleave() -> None:
424+
# Expected trace:
425+
# external-root
426+
# llama_parent
427+
# external-child
428+
# llama_child
429+
exporter, handler, provider = make_handler()
430+
clean_token = context.attach(context.Context())
431+
tracer = provider.get_tracer("external")
432+
433+
try:
434+
with tracer.start_as_current_span("external-root"):
435+
enter_llama_span(handler, "llama_parent")
436+
with tracer.start_as_current_span("external-child"):
437+
enter_llama_span(
438+
handler,
439+
"llama_child",
440+
parent_id="llama_parent",
441+
)
442+
exit_llama_span(handler, "llama_child")
443+
exit_llama_span(handler, "llama_parent")
444+
445+
spans = finished_spans_by_name(exporter, provider)
446+
assert_trace_chain(
447+
spans,
448+
["external-root", "llama_parent", "external-child", "llama_child"],
449+
)
450+
finally:
451+
context.detach(clean_token)
452+
453+
454+
def test_span_exit_ends_span_when_context_detach_fails(monkeypatch: Any) -> None:
455+
exporter, handler, provider = make_handler()
456+
clean_token = context.attach(context.Context())
457+
original_detach = context.detach
458+
459+
try:
460+
handler.span_enter(id_="root-uuid", bound_args=_bound)
461+
462+
def fail_detach(token: Any) -> None:
463+
raise RuntimeError("detach failed")
464+
465+
monkeypatch.setattr(context, "detach", fail_detach)
466+
handler.span_exit(id_="root-uuid", bound_args=_bound)
467+
provider.force_flush()
468+
469+
spans = exporter.get_finished_spans()
470+
assert len(spans) == 1
471+
assert spans[0].end_time is not None
472+
assert handler.all_spans == {}
473+
assert handler._context_tokens == {}
474+
finally:
475+
monkeypatch.setattr(context, "detach", original_detach)
476+
context.detach(clean_token)
477+
478+
360479
def test_capture_propagation_context() -> None:
361480
"""capture_propagation_context returns a dict with traceparent when a span is active."""
362481
exporter, handler, provider = make_handler()
363482
# Create a span so there's an active trace context
364483
handler.span_enter(id_="root-uuid", bound_args=_bound)
365-
# Activate the OTel span in the current context so capture can see it
366-
from opentelemetry.trace import set_span_in_context
367484

368485
otel_span = handler.all_spans["root-uuid"]
369-
ctx = set_span_in_context(otel_span)
370-
context.attach(ctx)
486+
assert trace.get_current_span() is otel_span
371487

372488
captured = handler.capture_propagation_context()
373489
assert "otel" in captured
@@ -389,19 +505,15 @@ def test_capture_restore_propagation_roundtrip() -> None:
389505
- externally-set OTel context (e.g. baggage-like ambient values) propagates
390506
through the traceparent mechanism
391507
"""
392-
from opentelemetry.trace import set_span_in_context
393-
394508
# --- Process A: create root span with tags, capture context ---
395509
exporter_a, handler_a, provider_a = make_handler()
396510

397511
tags_a = {"handler_id": "h1", "run_id": "r1", "myapp.custom": "val_a"}
398512
original_tags_a = dict(tags_a)
399513
handler_a.span_enter(id_="root-uuid", bound_args=_bound, tags=tags_a)
400514

401-
# Activate the OTel span in ambient context (simulating what the Dispatcher
402-
# would do before a serialization boundary)
403515
root_otel_span = handler_a.all_spans["root-uuid"]
404-
context.attach(set_span_in_context(root_otel_span))
516+
assert trace.get_current_span() is root_otel_span
405517

406518
# Capture propagation context — this is what gets serialized across the boundary
407519
captured_ctx = handler_a.capture_propagation_context()
@@ -486,7 +598,6 @@ def test_dispatcher_propagation_roundtrip_with_tags() -> None:
486598
active_instrument_tags,
487599
instrument_tags,
488600
)
489-
from opentelemetry.trace import set_span_in_context
490601

491602
exporter_a, handler_a, provider_a = make_handler()
492603
exporter_b, handler_b, provider_b = make_handler()
@@ -508,9 +619,8 @@ def test_dispatcher_propagation_roundtrip_with_tags() -> None:
508619
id_="root-uuid", bound_args=_bound, tags=active_instrument_tags.get()
509620
)
510621

511-
# Activate OTel span in ambient context
512622
root_otel_span = handler_a.all_spans["root-uuid"]
513-
context.attach(set_span_in_context(root_otel_span))
623+
assert trace.get_current_span() is root_otel_span
514624

515625
captured = dispatcher_a.capture_propagation_context()
516626

0 commit comments

Comments
 (0)