Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions libs/storage/Tsavorite/cs/src/core/Allocator/ScanIteratorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public abstract class ScanIteratorBase
/// <summary>Epoch from the store</summary>
protected readonly LightEpoch epoch;

/// <summary>Number of deferred DoReadPage drain callbacks that have been registered but not yet executed.</summary>
protected int pendingDrainCallbacks;

/// <summary>Current address for iteration</summary>
protected long currentAddress;
/// <summary>Next address for iteration</summary>
Expand Down Expand Up @@ -201,21 +204,24 @@ protected bool BufferAndLoad(long currentIterationAddress, long currentPage, lon
var readBuffer = objectReadBuffers is not null ? objectReadBuffers[nextFrame] : default;

var frameIndex = i;
_ = Interlocked.Increment(ref pendingDrainCallbacks);
if (epoch != null)
epoch.BumpCurrentEpoch(() => DoReadPage(frameIndex));
else
DoReadPage(frameIndex);

void DoReadPage(int frameIndex)
{
// The drain callback may execute after the iterator has been disposed (loadCompletionEvents set to null),
// because the callback is deferred via BumpCurrentEpoch and only runs when SafeToReclaimEpoch advances.
// This can happen for read-ahead pages (frameIndex > 0) when the scan completes before the callback runs.
var events = loadCompletionEvents;
if (events is null)
return;
AsyncReadPageFromDeviceToFrame(readBuffer, readPage: frameIndex + GetPageOfAddress(currentIterationAddress, logPageSizeBits), untilAddress: endIterationAddress,
context: Empty.Default, out events[nextFrame], devicePageOffset: 0, device: null, objectLogDevice: null, loadCTSs[nextFrame]);
try
{
AsyncReadPageFromDeviceToFrame(readBuffer, readPage: frameIndex + GetPageOfAddress(currentIterationAddress, logPageSizeBits), untilAddress: endIterationAddress,
context: Empty.Default, out loadCompletionEvents[nextFrame], devicePageOffset: 0, device: null, objectLogDevice: null, loadCTSs[nextFrame]);
}
catch
{
_ = Interlocked.Decrement(ref pendingDrainCallbacks);
throw;
}
loadedPages[nextFrame] = pageEndAddress;
}
}
Expand Down Expand Up @@ -272,16 +278,22 @@ internal abstract void AsyncReadPageFromDeviceToFrame<TContext>(CircularDiskRead

protected void AsyncReadPageFromDeviceToFrameCallback(uint errorCode, uint numBytes, object context)
{
var result = (PageAsyncReadResult<Empty>)context;
try
{
var result = (PageAsyncReadResult<Empty>)context;

if (errorCode == 0)
_ = result.handle?.Signal();
else
if (errorCode == 0)
_ = result.handle?.Signal();
else
{
logger?.LogError($"{nameof(AsyncReadPageFromDeviceToFrameCallback)} error: {{errorCode}}", errorCode);
result.cts?.Cancel();
}
}
finally
{
logger?.LogError($"{nameof(AsyncReadPageFromDeviceToFrameCallback)} error: {{errorCode}}", errorCode);
result.cts?.Cancel();
_ = Interlocked.Decrement(ref pendingDrainCallbacks);
}
Interlocked.MemoryBarrier();
}

/// <summary>
Expand Down Expand Up @@ -322,6 +334,14 @@ private bool WaitForFrameLoad(long currentAddress, long currentFrame)
/// </summary>
public virtual void Dispose()
{
// Wait for all deferred DoReadPage callbacks and their async I/O to complete before freeing
// resources. The counter is incremented before BumpCurrentEpoch registration and decremented
// in AsyncReadPageFromDeviceToFrameCallback when I/O completes, so reaching zero guarantees
// no outstanding access to our state. The deferred callbacks will be drained by other threads'
// epoch operations (Resume, Suspend, ProtectAndDrain).
while (Volatile.Read(ref pendingDrainCallbacks) > 0)
Thread.Yield();
Comment thread
badrishc marked this conversation as resolved.

for (var i = 0; i < frameSize; i++)
{
// Wait for ongoing reads to complete/fail; if the wait throws (e.g. due to cancellation), we still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,13 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object

Interlocked.MemoryBarrier();
}
catch when (disposed) { }
catch when (disposed)
{
}
finally
{
_ = Interlocked.Decrement(ref pendingDrainCallbacks);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,13 @@ private unsafe void AsyncReadPagesToFrameCallback(uint errorCode, uint numBytes,
}
Interlocked.MemoryBarrier();
}
catch when (disposed) { }
catch when (disposed)
{
}
finally
{
_ = Interlocked.Decrement(ref pendingDrainCallbacks);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
Loading