Skip to content

Commit 4ddd785

Browse files
badrishcCopilot
andauthored
Fix sorted-set Memory leaks and BITOP epoch tracking (#1752)
## Summary Two correctness bugs in the sorted-set object store and the BITOP read loop. ## 1. Sorted-set `IMemoryOwner` leaks in `GEO*STORE` / `ZUNIONSTORE` / `ZINTERSTORE` Three pooled-buffer leaks where backends (`GeoSearch`, `SortedSetRange`, internal `ZADD`'s `SortedSetAdd`) wrote replies via `RespMemoryWriter`, which (with a default `SpanByte`) rents a `MemoryPool<byte>` buffer (≥512 bytes) and assigns it to `.SpanByteAndMemory.Memory`: - **`SortedSetGeoOps.cs` (`GEO*STORE`)** — the `searchOutMem.Memory` from `GeoSearch` was leaked (only `searchOutHandler` — the `MemoryHandle` from `Pin()` — was disposed). The internal `ZADD` invocation (`zAddOutput`) was discarded entirely. - **`SortedSetOps.cs` (`ZUNIONSTORE` / `ZINTERSTORE`)** — same pattern: `rangeOutputMem.Memory` from `SortedSetRange` was leaked, and the internal `ZADD`'s `zAddOutput.SpanByteAndMemory.Memory` was leaked. Under heavy `GEO*STORE` / `ZUNIONSTORE` / `ZINTERSTORE` traffic this was real `MemoryPool` churn and GC pressure. ### Fix - For each `*STORE`-style internal `ZADD`, wrap the `RMWObjectStoreOperationWithOutput` call in a `try` / `finally` that disposes `zAddOutput.SpanByteAndMemory.Memory` if `!IsSpanByte`. - Extend the existing `finally` blocks that dispose `*Handler` (the `MemoryHandle`) to also dispose the underlying `*Mem.Memory` (the `IMemoryOwner<byte>`). ## 2. BITOP: pending-completion epoch tracking was broken `StorageSession.HeadAddress` was a `readonly long` field captured at session-construction time and never updated. `MainStoreOps.ReadWithUnsafeContext` compared it against itself (`HeadAddress == localHeadAddress`) to decide whether to set `epochChanged = true` after pending completion. Two bugs: 1. The field is frozen, so the check was meaningless — the live store `HeadAddress` was never consulted. 2. The condition was also **inverted**: it set `epochChanged = true` when the addresses were equal (i.e., head did NOT move), the opposite of what the comment said. In addition, `Read` can return synchronously with a pointer into the **read cache** (a separate log with its own `HeadAddress` that can be evicted independently of the main log). The original check would not detect read-cache eviction. ### Fix - Removed the stale `StorageSession.HeadAddress` field. - Added `ClientSession.HeadAddress` and `ClientSession.ReadCacheHeadAddress` accessors that read the live values from `store.Log.HeadAddress` / `store.ReadCache?.HeadAddress`. - `ReadWithUnsafeContext` now captures both addresses at the start of the BITOP loop and, after pending completion, sets `epochChanged = true` if **either** has advanced — correctly invalidating any pointers captured into either log. ## Files changed - `libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs` — new `HeadAddress` and `ReadCacheHeadAddress` live accessors - `libs/server/Storage/Session/StorageSession.cs` — removed stale `HeadAddress` field - `libs/server/Storage/Session/MainStore/MainStoreOps.cs` — `ReadWithUnsafeContext` uses live addresses with the corrected comparison; signature now also takes `localReadCacheHeadAddress` - `libs/server/Storage/Session/MainStore/BitmapOps.cs` — captures both live head addresses; passes them to `ReadWithUnsafeContext` - `libs/server/Storage/Session/ObjectStore/SortedSetGeoOps.cs` — leak fixes (`searchOutMem` + `zAddOutput`) - `libs/server/Storage/Session/ObjectStore/SortedSetOps.cs` — leak fixes (`rangeOutputMem` + `zAddOutput`) ## Validation - All 660 sorted-set + geo + bitmap tests pass on `main` - `dotnet format --verify-no-changes` clean ## Note on related fixes on `dev` This is a subset of the fixes on the companion `dev` branch (`badrishc/memory-fixes`). Other fixes from that branch were intentionally **not** ported here because they do not apply to `main`: - The BITOP overflow-pointer fix relies on the `ISourceLogRecord` / `LogRecord` / `OverflowByteArray` model that exists only on `dev`. On `main` the BITOP backend operates on `SpanByte` values that are always pinned in log memory, so the use-after-fixed bug fixed on `dev` does not exist on `main`. - The PFCOUNT/PFMERGE bounds-check tightening from `dev` is unnecessary on `main` because the `main` backend already validates `value.Length <= dst.Length` *before* the `Buffer.MemoryCopy`, so the wrong-capacity argument is gated. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d746a55 commit 4ddd785

6 files changed

Lines changed: 59 additions & 11 deletions

File tree

libs/server/Storage/Session/MainStore/BitmapOps.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,16 @@ public unsafe GarnetStatus StringBitOperation(ref RawStringInput input, BitmapOp
105105
{
106106
uc.BeginUnsafe();
107107
readFromScratch:
108-
var localHeadAddress = HeadAddress;
108+
var localHeadAddress = uc.Session.HeadAddress;
109+
var localReadCacheHeadAddress = uc.Session.ReadCacheHeadAddress;
109110
var keysFound = 0;
110111

111112
for (var i = 1; i < keys.Length; i++)
112113
{
113114
var srcKey = keys[i];
114115
//Read srcKey
115116
var outputBitmap = SpanByteAndMemory.FromPinnedSpan(output);
116-
status = ReadWithUnsafeContext(srcKey, ref input, ref outputBitmap, localHeadAddress, out bool epochChanged, ref uc);
117+
status = ReadWithUnsafeContext(srcKey, ref input, ref outputBitmap, localHeadAddress, localReadCacheHeadAddress, out bool epochChanged, ref uc);
117118
if (epochChanged)
118119
{
119120
goto readFromScratch;

libs/server/Storage/Session/MainStore/MainStoreOps.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public GarnetStatus GET<TContext>(ref SpanByte key, ref RawStringInput input, re
4747
}
4848
}
4949

50-
public unsafe GarnetStatus ReadWithUnsafeContext<TContext>(ArgSlice key, ref RawStringInput input, ref SpanByteAndMemory output, long localHeadAddress, out bool epochChanged, ref TContext context)
50+
public unsafe GarnetStatus ReadWithUnsafeContext<TContext>(ArgSlice key, ref RawStringInput input, ref SpanByteAndMemory output, long localHeadAddress, long localReadCacheHeadAddress, out bool epochChanged, ref TContext context)
5151
where TContext : ITsavoriteContext<SpanByte, SpanByte, RawStringInput, SpanByteAndMemory, long, MainSessionFunctions, MainStoreFunctions, MainStoreAllocator>, IUnsafeContext
5252
{
5353
var _key = key.SpanByte;
@@ -63,8 +63,12 @@ public unsafe GarnetStatus ReadWithUnsafeContext<TContext>(ArgSlice key, ref Raw
6363
CompletePendingForSession(ref status, ref output, ref context);
6464
StopPendingMetrics();
6565
context.BeginUnsafe();
66-
// Start read of pointers from beginning if epoch changed
67-
if (HeadAddress == localHeadAddress)
66+
// If either the main-log head or the read-cache head advanced while we waited on
67+
// pending I/O, any log/read-cache pointers captured by previous reads in this loop
68+
// may now reference evicted pages. Tell the caller to re-read all sources from
69+
// scratch. (Synchronously-returned reads can have pointers into either log.)
70+
if (context.Session.HeadAddress != localHeadAddress
71+
|| context.Session.ReadCacheHeadAddress != localReadCacheHeadAddress)
6872
{
6973
context.EndUnsafe();
7074
epochChanged = true;

libs/server/Storage/Session/ObjectStore/SortedSetGeoOps.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,27 @@ public unsafe GarnetStatus GeoSearchStore<TObjectContext>(ArgSlice key, ArgSlice
207207
}, ref parseState);
208208

209209
var zAddOutput = new GarnetObjectStoreOutput();
210-
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
210+
try
211+
{
212+
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
211213

212-
writer.WriteInt32(foundItems);
214+
writer.WriteInt32(foundItems);
215+
}
216+
finally
217+
{
218+
// ZADD backend writes its result via RespMemoryWriter, which allocates a
219+
// MemoryPool buffer when the (default) SpanByte cannot hold the response.
220+
// Dispose to avoid leaking that buffer back to the pool.
221+
if (!zAddOutput.SpanByteAndMemory.IsSpanByte)
222+
zAddOutput.SpanByteAndMemory.Memory?.Dispose();
223+
}
213224
}
214225
finally
215226
{
216227
searchOutHandler.Dispose();
228+
// GeoSearch writes via RespMemoryWriter, which (with a default SpanByte) rents a
229+
// MemoryPool buffer and assigns it here. Dispose to release it back to the pool.
230+
searchOutMem.Memory?.Dispose();
217231
}
218232

219233
return GarnetStatus.OK;

libs/server/Storage/Session/ObjectStore/SortedSetOps.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -785,13 +785,27 @@ public unsafe GarnetStatus SortedSetRangeStore<TObjectContext>(ArgSlice dstKey,
785785
}, ref parseState);
786786

787787
var zAddOutput = new GarnetObjectStoreOutput();
788-
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
789-
itemBroker.HandleCollectionUpdate(destinationKey);
788+
try
789+
{
790+
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
791+
itemBroker.HandleCollectionUpdate(destinationKey);
792+
}
793+
finally
794+
{
795+
// ZADD backend writes its result via RespMemoryWriter, which allocates a
796+
// MemoryPool buffer when the (default) SpanByte cannot hold the response.
797+
// Dispose to avoid leaking that buffer back to the pool.
798+
if (!zAddOutput.SpanByteAndMemory.IsSpanByte)
799+
zAddOutput.SpanByteAndMemory.Memory?.Dispose();
800+
}
790801
}
791802
}
792803
finally
793804
{
794805
rangeOutputHandler.Dispose();
806+
// SortedSetRange writes via RespMemoryWriter, which (with a default SpanByte) rents
807+
// a MemoryPool buffer and assigns it here. Dispose to release it back to the pool.
808+
rangeOutputMem.Memory?.Dispose();
795809
}
796810
return status;
797811
}

libs/server/Storage/Session/StorageSession.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ sealed partial class StorageSession : IDisposable
2222
{
2323
int bitmapBufferSize = 1 << 15;
2424
SectorAlignedMemory sectorAlignedMemoryBitmap;
25-
readonly long HeadAddress;
2625

2726
/// <summary>
2827
/// Session Contexts for main store
@@ -107,7 +106,6 @@ public StorageSession(StoreWrapper storeWrapper,
107106
vectorContext = vectorSession.BasicContext;
108107
vectorLockableContext = vectorSession.LockableContext;
109108

110-
HeadAddress = db.MainStore.Log.HeadAddress;
111109
ObjectScanCountLimit = storeWrapper.serverOptions.ObjectScanCountLimit;
112110
}
113111

libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,23 @@ internal ClientSession(
108108
/// </summary>
109109
public long Version => ctx.version;
110110

111+
/// <summary>
112+
/// The current head address of the underlying store's main log. Reads the live value, so it
113+
/// reflects any updates from log eviction or page advancement. Callers that compare snapshots
114+
/// (e.g. before vs. after a pending I/O completion) should hold epoch protection so that the
115+
/// addresses they read remain meaningful.
116+
/// </summary>
117+
public long HeadAddress => store.Log.HeadAddress;
118+
119+
/// <summary>
120+
/// The current head address of the underlying store's read cache, or 0 if the read cache is
121+
/// not configured. Reads the live value. Callers that capture pointers into records returned
122+
/// by <c>Read</c> must check this in addition to <see cref="HeadAddress"/>: a synchronously
123+
/// returned pointer can live in the read cache (when the record was cached on a prior disk
124+
/// read), and that page can be evicted independently of the main log.
125+
/// </summary>
126+
public long ReadCacheHeadAddress => store.ReadCache?.HeadAddress ?? 0;
127+
111128
/// <summary>
112129
/// Dispose session
113130
/// </summary>

0 commit comments

Comments
 (0)