Skip to content

Commit c4855e4

Browse files
badrishcCopilot
andcommitted
Add spin-wait in GetNext before epoch-table scan
When the iterator catches up to the cached SafeTailAddress, spin briefly (SpinWait(100) ≈ a few μs) before scanning the epoch table. This lets concurrent producers complete in-flight writes and advance TailAddress, so the subsequent RefreshSafeTailAddress scan jumps the cache forward by many records. The next batch of GetNext calls then use the O(1) cached path, amortizing the scan cost to ~1ns/record. Benchmark: 1T WithConsumer improves from ~2.6 Mops (immediate scan) to ~8.0 Mops (spin-wait), matching the dev@10ms baseline (7.7 Mops). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 8e27fd3 commit c4855e4

2 files changed

Lines changed: 22 additions & 14 deletions

File tree

libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1175,7 +1175,7 @@ public unsafe void Enqueue<THeader, TInput>(THeader userHeader, ReadOnlySpan<byt
11751175
offset += item1.TotalSize();
11761176
_ = input.CopyTo(physicalAddress + offset, input.SerializedLength);
11771177
SetHeader(length, physicalAddress);
1178-
safeTailRefreshEntryEnqueued?.Signal();
1178+
EndInflightEnqueue();
11791179
}
11801180
finally
11811181
{

libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLogScanIterator.cs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -749,13 +749,19 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
749749
if (disposed)
750750
return false;
751751

752-
// For scanUncommitted: check cached SafeTailAddress first (O(1)). Only scan the
753-
// epoch table via RefreshSafeTailAddress when caught up to the cache, avoiding
754-
// the O(kTableSize) scan on every record.
755-
var boundary = scanUncommitted
756-
? (currentAddress < tsavoriteLog.SafeTailAddress ? tsavoriteLog.SafeTailAddress : tsavoriteLog.RefreshSafeTailAddress())
757-
: tsavoriteLog.CommittedUntilAddress;
758-
if (currentAddress >= endAddress || currentAddress >= boundary)
752+
// For scanUncommitted: check cached SafeTailAddress first (O(1)). If caught up,
753+
// spin-wait briefly to let producers advance tail and complete in-flight writes,
754+
// then re-check the cache before falling back to the full epoch-table scan.
755+
// At 15 Mops, ~150 enqueues complete during the spin, so the next ~150 GetNext
756+
// calls use the cache (O(1)) — amortizing the scan cost to ~1ns/record.
757+
if (scanUncommitted && currentAddress >= tsavoriteLog.SafeTailAddress)
758+
{
759+
Thread.SpinWait(100);
760+
if (currentAddress >= tsavoriteLog.SafeTailAddress
761+
&& currentAddress >= tsavoriteLog.RefreshSafeTailAddress())
762+
return false;
763+
}
764+
else if (currentAddress >= endAddress || currentAddress >= (scanUncommitted ? tsavoriteLog.SafeTailAddress : tsavoriteLog.CommittedUntilAddress))
759765
return false;
760766

761767
if (currentAddress < _headAddress)
@@ -873,12 +879,14 @@ private unsafe bool ExpandGetNextInternal(long startPhysicalAddress, ref int tot
873879
if (disposed)
874880
return false;
875881

876-
// For scanUncommitted: check cached SafeTailAddress first (O(1)). Only scan the
877-
// epoch table via RefreshSafeTailAddress when caught up to the cache.
878-
var boundary2 = scanUncommitted
879-
? (currentAddress < tsavoriteLog.SafeTailAddress ? tsavoriteLog.SafeTailAddress : tsavoriteLog.RefreshSafeTailAddress())
880-
: tsavoriteLog.CommittedUntilAddress;
881-
if (currentAddress >= endAddress || currentAddress >= boundary2)
882+
if (scanUncommitted && currentAddress >= tsavoriteLog.SafeTailAddress)
883+
{
884+
Thread.SpinWait(100);
885+
if (currentAddress >= tsavoriteLog.SafeTailAddress
886+
&& currentAddress >= tsavoriteLog.RefreshSafeTailAddress())
887+
return false;
888+
}
889+
else if (currentAddress >= endAddress || currentAddress >= (scanUncommitted ? tsavoriteLog.SafeTailAddress : tsavoriteLog.CommittedUntilAddress))
882890
return false;
883891

884892
if (currentAddress < _headAddress)

0 commit comments

Comments
 (0)