Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::error::{OTelSdkError, OTelSdkResult};
use crate::logs::log_processor::LogProcessor;
use crate::util::BlockingStrategy;
use crate::{
logs::{LogBatch, LogExporter, SdkLogRecord},
Resource,
Expand Down Expand Up @@ -342,6 +343,7 @@ impl BatchLogProcessor {
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();
let blocking_strategy = BlockingStrategy::new();

let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
Expand All @@ -368,6 +370,7 @@ impl BatchLogProcessor {
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
max_export_size: usize,
blocking_strategy: &BlockingStrategy,
) -> OTelSdkResult
where
E: LogExporter + Send + Sync + 'static,
Expand All @@ -388,13 +391,15 @@ impl BatchLogProcessor {
let count_of_logs = logs.len(); // Count of logs that will be exported
total_exported_logs += count_of_logs;

result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting
result =
export_batch_sync(exporter, logs, last_export_time, blocking_strategy); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
}
result
}

let blocking_strategy = blocking_strategy;
loop {
let remaining_time = config
.scheduled_delay
Expand All @@ -417,6 +422,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
}
Ok(BatchMessage::ForceFlush(sender)) => {
Expand All @@ -428,6 +434,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
let _ = sender.send(result);
}
Expand All @@ -440,6 +447,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
let _ = exporter.shutdown();
let _ = sender.send(result);
Expand Down Expand Up @@ -468,6 +476,7 @@ impl BatchLogProcessor {
&mut last_export_time,
&current_batch_size,
max_export_batch_size,
&blocking_strategy,
);
}
Err(RecvTimeoutError::Disconnected) => {
Expand Down Expand Up @@ -518,6 +527,7 @@ fn export_batch_sync<E>(
exporter: &E,
batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
last_export_time: &mut Instant,
blocking_strategy: &BlockingStrategy,
) -> OTelSdkResult
where
E: LogExporter + ?Sized,
Expand All @@ -529,7 +539,7 @@ where
}

let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
let export_result = futures_executor::block_on(export);
let export_result = blocking_strategy.block_on(export);

// Clear the batch vec after exporting
batch.clear();
Expand Down Expand Up @@ -1021,6 +1031,66 @@ mod tests {
processor.shutdown().unwrap();
}

// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
// exporters where the export future depends on tokio tasks. Without
// BlockingStrategy, this deadlocks on constrained runtimes because
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
#[derive(Debug, Clone)]
struct TokioSpawnLogExporter {
exported_count: Arc<AtomicUsize>,
}

impl TokioSpawnLogExporter {
fn new() -> Self {
Self {
exported_count: Arc::new(AtomicUsize::new(0)),
}
}
}

impl LogExporter for TokioSpawnLogExporter {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let count = batch.len();
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
let result = tokio::spawn(async move { count }).await.unwrap();
assert_eq!(result, count);
self.exported_count.fetch_add(count, Ordering::Relaxed);
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
}

// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
// Uses TokioSpawnLogExporter which internally calls tokio::spawn(),
// simulating tonic/gRPC exporters where the export future depends on
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
// without the runtime context.
//
// Note: current_thread runtime is not tested here because it has a
// fundamental limitation — the single runtime thread is blocked by
// force_flush()'s recv(), so no thread is available to drive spawned
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
// target of this fix.

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_batch_log_processor_multi_thread_1_worker_with_tokio_spawn_exporter() {
let exporter = TokioSpawnLogExporter::new();
let exported_count = exporter.exported_count.clone();
let processor = BatchLogProcessor::new(exporter, BatchConfig::default());

let mut record = SdkLogRecord::new();
let instrumentation = InstrumentationScope::default();
processor.emit(&mut record, &instrumentation);

processor.force_flush().unwrap();

assert_eq!(exported_count.load(Ordering::Relaxed), 1);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
Expand Down
75 changes: 73 additions & 2 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
util::BlockingStrategy,
Resource,
};

Expand Down Expand Up @@ -152,6 +153,7 @@ impl<E: PushMetricExporter> PeriodicReader<E> {
message_sender,
producer: Mutex::new(None),
exporter: exporter_arc.clone(),
blocking_strategy: BlockingStrategy::new(),
}),
};
let cloned_reader = reader.clone();
Expand Down Expand Up @@ -351,6 +353,7 @@ struct PeriodicReaderInner<E: PushMetricExporter> {
exporter: Arc<E>,
message_sender: mpsc::Sender<Message>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
blocking_strategy: BlockingStrategy,
}

impl<E: PushMetricExporter> PeriodicReaderInner<E> {
Expand Down Expand Up @@ -407,9 +410,8 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
});
otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());

// Relying on futures executor to execute async call.
// TODO: Pass timeout to exporter
futures_executor::block_on(self.exporter.export(rm))
self.blocking_strategy.block_on(self.exporter.export(rm))
}

fn force_flush(&self) -> OTelSdkResult {
Expand Down Expand Up @@ -604,6 +606,75 @@ mod tests {
}
}

// Mock exporter that uses tokio::spawn internally, simulating tonic/gRPC
// exporters where the export future depends on tokio tasks. Without
// BlockingStrategy, this deadlocks on constrained runtimes because
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks.
#[derive(Debug, Clone, Default)]
struct TokioSpawnMetricExporter {
exported_count: Arc<AtomicUsize>,
is_shutdown: Arc<AtomicBool>,
}

impl PushMetricExporter for TokioSpawnMetricExporter {
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
// Simulate tonic/gRPC: the export future depends on a tokio::spawn-ed task.
let result = tokio::spawn(async { 42 }).await.unwrap();
assert_eq!(result, 42);
self.exported_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}

fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.shutdown()
}

fn temporality(&self) -> Temporality {
Temporality::Cumulative
}
}

// Regression test for deadlock on constrained tokio runtimes (#2802, #3356).
// Uses TokioSpawnMetricExporter which internally calls tokio::spawn(),
// simulating tonic/gRPC exporters where the export future depends on
// tokio-spawned tasks. Without BlockingStrategy, this deadlocks because
// futures_executor::block_on() cannot drive tokio::spawn-ed tasks
// without the runtime context.
//
// Note: current_thread runtime is not tested here because it has a
// fundamental limitation — the single runtime thread is blocked by
// force_flush()'s recv(), so no thread is available to drive spawned
// tasks. The multi_thread(1) scenario (1-vCPU k8s pods) is the primary
// target of this fix.

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_periodic_reader_multi_thread_1_worker_with_tokio_spawn_exporter() {
let exporter = TokioSpawnMetricExporter::default();
let exported_count = exporter.exported_count.clone();

let reader = PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(120))
.build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("test.counter").build();
counter.add(1, &[]);

meter_provider.force_flush().unwrap();

assert!(exported_count.load(Ordering::Relaxed) >= 1);
}

#[test]
fn collection_triggered_by_interval_multiple() {
// Arrange
Expand Down
Loading
Loading