Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 235 additions & 87 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs

Large diffs are not rendered by default.

77 changes: 51 additions & 26 deletions libs/server/Objects/ItemBroker/CollectionItemBrokerEvent.cs
Original file line number Diff line number Diff line change
@@ -1,58 +1,83 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Collections.Concurrent;
using System.Runtime.InteropServices;

namespace Garnet.server
{
/// <summary>
/// Base class for events handled by CollectionItemBroker's main loop
/// Event types handled by CollectionItemBroker
/// </summary>
internal abstract class BrokerEventBase
internal enum CollectionItemBrokerEventType : byte
{
NotSet = 0,
NewObserver = 1,
CollectionUpdated = 2,
}

/// <summary>
/// Event to notify CollectionItemBroker that a collection has been updated
/// Struct that holds data for different event types handled by CollectionItemBroker
/// </summary>
internal class CollectionUpdatedEvent : BrokerEventBase
[StructLayout(LayoutKind.Explicit, Size = 17)]
internal struct CollectionItemBrokerEvent
{
/// <summary>
/// Key of updated collection
/// Key of updated collection (for a CollectionUpdated event)
/// </summary>
internal readonly byte[] Key;
[FieldOffset(0)]
internal byte[] Key = null;

/// <summary>
/// Observers
/// The keys that the observer requests to subscribe on (for a NewObserver event)
/// </summary>
internal readonly ConcurrentQueue<CollectionItemObserver> Observers;
[FieldOffset(0)]
internal byte[][] Keys = null;

public CollectionUpdatedEvent(byte[] key, ConcurrentQueue<CollectionItemObserver> observers)
/// <summary>
/// The new observer instance (for a NewObserver event)
/// </summary>
[FieldOffset(8)]
internal CollectionItemObserver Observer = null;

/// <summary>
/// The type of event represented
/// </summary>
[FieldOffset(16)]
internal CollectionItemBrokerEventType EventType = CollectionItemBrokerEventType.NotSet;

public CollectionItemBrokerEvent()
{
Key = key;
Observers = observers;

}
}

/// <summary>
/// Event to notify CollectionItemBroker that a new observer was created
/// </summary>
internal class NewObserverEvent : BrokerEventBase
{
/// <summary>
/// The new observer instance
/// Creates a CollectionUpdated event
/// </summary>
internal CollectionItemObserver Observer { get; }
/// <param name="key">Key of updated collection</param>
public static CollectionItemBrokerEvent CreateCollectionUpdatedEvent(byte[] key)
{
return new CollectionItemBrokerEvent
{
EventType = CollectionItemBrokerEventType.CollectionUpdated,
Key = key
};
}

/// <summary>
/// The keys that the observer requests to subscribe on
/// Creates a NewObserver event
/// </summary>
internal byte[][] Keys { get; }

internal NewObserverEvent(CollectionItemObserver observer, byte[][] keys)
/// <param name="observer">The new observer instance</param>
/// <param name="keys">The keys that the observer requests to subscribe on</param>
public static CollectionItemBrokerEvent CreateNewObserverEvent(CollectionItemObserver observer, byte[][] keys)
{
Observer = observer;
Keys = keys;
return new CollectionItemBrokerEvent
{
EventType = CollectionItemBrokerEventType.NewObserver,
Observer = observer,
Keys = keys,
};
}

public bool IsDefault() => EventType == CollectionItemBrokerEventType.NotSet;
}
}
20 changes: 12 additions & 8 deletions libs/server/Objects/ItemBroker/CollectionItemObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System.Threading;
using Garnet.common;

namespace Garnet.server
{
Expand Down Expand Up @@ -38,7 +39,7 @@ internal class CollectionItemObserver
/// <summary>
/// Lock for the status of the observer
/// </summary>
internal ReaderWriterLockSlim ObserverStatusLock { get; } = new();
internal SingleWriterMultiReaderLock ObserverStatusLock;

/// <summary>
/// Semaphore to notify the ResultSet status
Expand All @@ -62,14 +63,16 @@ internal CollectionItemObserver(RespServerSession session, RespCommand command,
/// Safely set the result for the observer
/// </summary>
/// <param name="result"></param>
internal void HandleSetResult(CollectionItemResult result)
/// <param name="isWriteLocked">True if the ObserverStatusLock was write locked by the caller</param>
internal void HandleSetResult(CollectionItemResult result, bool isWriteLocked = false)
{
// If the result is already set or the observer session is disposed
// There is no need to set the result
if (Status != ObserverStatus.WaitingForResult)
return;

ObserverStatusLock.EnterWriteLock();
if (!isWriteLocked)
ObserverStatusLock.WriteLock();
try
{
if (Status != ObserverStatus.WaitingForResult)
Expand All @@ -82,7 +85,8 @@ internal void HandleSetResult(CollectionItemResult result)
}
finally
{
ObserverStatusLock.ExitWriteLock();
if (!isWriteLocked)
ObserverStatusLock.WriteUnlock();
}
}

Expand All @@ -93,7 +97,7 @@ internal bool TryForceUnblock(bool throwError = false)
if (Status != ObserverStatus.WaitingForResult)
return false;

ObserverStatusLock.EnterWriteLock();
ObserverStatusLock.WriteLock();
try
{
if (Status != ObserverStatus.WaitingForResult)
Expand All @@ -107,7 +111,7 @@ internal bool TryForceUnblock(bool throwError = false)
}
finally
{
ObserverStatusLock.ExitWriteLock();
ObserverStatusLock.WriteUnlock();
}
}

Expand All @@ -116,15 +120,15 @@ internal bool TryForceUnblock(bool throwError = false)
/// </summary>
internal void HandleSessionDisposed()
{
ObserverStatusLock.EnterWriteLock();
ObserverStatusLock.WriteLock();
try
{
Status = ObserverStatus.SessionDisposed;
CancellationTokenSource.Cancel();
}
finally
{
ObserverStatusLock.ExitWriteLock();
ObserverStatusLock.WriteUnlock();
}
}
}
Expand Down
19 changes: 15 additions & 4 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
Items = items;
}

private CollectionItemResult(bool isForceUnblocked)
private CollectionItemResult(bool isForceUnblocked, bool isTypeMismatch)
{
IsForceUnblocked = isForceUnblocked;
IsTypeMismatch = isTypeMismatch;
}

/// <summary>
Expand Down Expand Up @@ -72,16 +73,26 @@ private CollectionItemResult(bool isForceUnblocked)
/// <summary>
/// Gets a value indicating whether the item retrieval was force unblocked.
/// </summary>
internal readonly bool IsForceUnblocked { get; }
internal bool IsForceUnblocked { get; }

/// <summary>
/// Gets a value indicating whether the item retrieval returned a type mismatch.
/// </summary>
internal bool IsTypeMismatch { get; }

/// <summary>
/// Instance of empty result
/// </summary>
internal static readonly CollectionItemResult Empty = new(null, item: null);

/// <summary>
/// Instance representing an Force Unblocked result.
/// Instance representing a Force Unblocked result.
/// </summary>
internal static readonly CollectionItemResult ForceUnblocked = new(isForceUnblocked: true, isTypeMismatch: false);

/// <summary>
/// Instance representing a Type Mismatch result.
/// </summary>
internal static readonly CollectionItemResult ForceUnblocked = new(true);
internal static readonly CollectionItemResult TypeMismatch = new(isForceUnblocked: false, isTypeMismatch: true);
}
}
28 changes: 28 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ private bool ListBlockingPop(RespCommand command)
return true;
}

if (result.IsTypeMismatch)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
while (!RespWriteUtils.TryWriteNullArray(ref dcurr, dend))
Expand Down Expand Up @@ -419,6 +426,20 @@ private bool ListBlockingMove(ArgSlice srcKey, ArgSlice dstKey, ArgSlice srcDir,
var result = storeWrapper.itemBroker.MoveCollectionItemAsync(RespCommand.BLMOVE, srcKey.ToArray(), this, timeout,
cmdArgs).Result;

if (result.IsForceUnblocked)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
return true;
}

if (result.IsTypeMismatch)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
WriteNull();
Expand Down Expand Up @@ -995,6 +1016,13 @@ private unsafe bool ListBlockingPopMultiple()
SendAndReset();
}

if (result.IsTypeMismatch)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
WriteNull();
Expand Down
28 changes: 28 additions & 0 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,20 @@ private unsafe bool SortedSetBlockingPop(RespCommand command)

var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result;

if (result.IsForceUnblocked)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
return true;
}

if (result.IsTypeMismatch)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
WriteNull();
Expand Down Expand Up @@ -1668,6 +1682,20 @@ private unsafe bool SortedSetBlockingMPop()

var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BZMPOP, keysBytes, this, timeout, cmdArgs).Result;

if (result.IsForceUnblocked)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
return true;
}

if (result.IsTypeMismatch)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
WriteNull();
Expand Down
Loading