Skip to content

Commit 9e3d0d8

Browse files
badrishcCopilot
andcommitted
Wait for async I/O completion before disposing scan iterator
BufferAndLoad defers page reads via BumpCurrentEpoch(() => DoReadPage(...)). SuspendDrain guarantees the drain callback (DoReadPage) has executed by the time GetNext returns, but the async I/O issued by DoReadPage may still be in flight. If Dispose frees the frame before the I/O callback fires, the callback writes to freed native memory (AccessViolationException). Fix: track outstanding async I/O with a pendingDrainCallbacks counter. Increment before issuing AsyncReadPageFromDeviceToFrame, decrement in AsyncReadPageFromDeviceToFrameCallback. Dispose spin-waits for the counter to reach zero before freeing resources. No epoch acquisition needed in Dispose — SuspendDrain already guarantees drain callbacks have executed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b86f828 commit 9e3d0d8

1 file changed

Lines changed: 23 additions & 9 deletions

File tree

libs/storage/Tsavorite/cs/src/core/Allocator/ScanIteratorBase.cs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ public abstract class ScanIteratorBase
2828
/// <summary>Epoch from the store</summary>
2929
protected readonly LightEpoch epoch;
3030

31+
/// <summary>Number of deferred DoReadPage drain callbacks that have been registered but not yet executed.</summary>
32+
int pendingDrainCallbacks;
33+
3134
/// <summary>Current address for iteration</summary>
3235
protected long currentAddress;
3336
/// <summary>Next address for iteration</summary>
@@ -201,21 +204,24 @@ protected bool BufferAndLoad(long currentIterationAddress, long currentPage, lon
201204
var readBuffer = objectReadBuffers is not null ? objectReadBuffers[nextFrame] : default;
202205

203206
var frameIndex = i;
207+
Interlocked.Increment(ref pendingDrainCallbacks);
204208
if (epoch != null)
205209
epoch.BumpCurrentEpoch(() => DoReadPage(frameIndex));
206210
else
207211
DoReadPage(frameIndex);
208212

209213
void DoReadPage(int frameIndex)
210214
{
211-
// The drain callback may execute after the iterator has been disposed (loadCompletionEvents set to null),
212-
// because the callback is deferred via BumpCurrentEpoch and only runs when SafeToReclaimEpoch advances.
213-
// This can happen for read-ahead pages (frameIndex > 0) when the scan completes before the callback runs.
214-
var events = loadCompletionEvents;
215-
if (events is null)
216-
return;
217-
AsyncReadPageFromDeviceToFrame(readBuffer, readPage: frameIndex + GetPageOfAddress(currentIterationAddress, logPageSizeBits), untilAddress: endIterationAddress,
218-
context: Empty.Default, out events[nextFrame], devicePageOffset: 0, device: null, objectLogDevice: null, loadCTSs[nextFrame]);
215+
try
216+
{
217+
AsyncReadPageFromDeviceToFrame(readBuffer, readPage: frameIndex + GetPageOfAddress(currentIterationAddress, logPageSizeBits), untilAddress: endIterationAddress,
218+
context: Empty.Default, out loadCompletionEvents[nextFrame], devicePageOffset: 0, device: null, objectLogDevice: null, loadCTSs[nextFrame]);
219+
}
220+
catch
221+
{
222+
Interlocked.Decrement(ref pendingDrainCallbacks);
223+
throw;
224+
}
219225
loadedPages[nextFrame] = pageEndAddress;
220226
}
221227
}
@@ -281,7 +287,7 @@ protected void AsyncReadPageFromDeviceToFrameCallback(uint errorCode, uint numBy
281287
logger?.LogError($"{nameof(AsyncReadPageFromDeviceToFrameCallback)} error: {{errorCode}}", errorCode);
282288
result.cts?.Cancel();
283289
}
284-
Interlocked.MemoryBarrier();
290+
Interlocked.Decrement(ref pendingDrainCallbacks);
285291
}
286292

287293
/// <summary>
@@ -322,6 +328,14 @@ private bool WaitForFrameLoad(long currentAddress, long currentFrame)
322328
/// </summary>
323329
public virtual void Dispose()
324330
{
331+
// Wait for all deferred DoReadPage callbacks and their async I/O to complete before freeing
332+
// resources. The counter is incremented before BumpCurrentEpoch registration and decremented
333+
// in AsyncReadPageFromDeviceToFrameCallback when I/O completes, so reaching zero guarantees
334+
// no outstanding access to our state. The deferred callbacks will be drained by other threads'
335+
// epoch operations (Resume, Suspend, ProtectAndDrain).
336+
while (Volatile.Read(ref pendingDrainCallbacks) > 0)
337+
Thread.Yield();
338+
325339
for (var i = 0; i < frameSize; i++)
326340
{
327341
// Wait for ongoing reads to complete/fail; if the wait throws (e.g. due to cancellation), we still

0 commit comments

Comments
 (0)