Skip to content

Commit c952a58

Browse files
authored
CollectionItemBroker Cleanup + Fixes (#1205)
* CollectionItemBroker cleanup * format * bugfix * format * Addressing some comments * Addressing more comments * Addressing comments * format
1 parent 1db1f9e commit c952a58

7 files changed

Lines changed: 903 additions & 131 deletions

File tree

libs/server/Objects/ItemBroker/CollectionItemBroker.cs

Lines changed: 235 additions & 87 deletions
Large diffs are not rendered by default.
Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,83 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
using System.Collections.Concurrent;
4+
using System.Runtime.InteropServices;
55

66
namespace Garnet.server
77
{
88
/// <summary>
9-
/// Base class for events handled by CollectionItemBroker's main loop
9+
/// Event types handled by CollectionItemBroker
1010
/// </summary>
11-
internal abstract class BrokerEventBase
11+
internal enum CollectionItemBrokerEventType : byte
1212
{
13+
NotSet = 0,
14+
NewObserver = 1,
15+
CollectionUpdated = 2,
1316
}
1417

1518
/// <summary>
16-
/// Event to notify CollectionItemBroker that a collection has been updated
19+
/// Struct that holds data for different event types handled by CollectionItemBroker
1720
/// </summary>
18-
internal class CollectionUpdatedEvent : BrokerEventBase
21+
[StructLayout(LayoutKind.Explicit, Size = 17)]
22+
internal struct CollectionItemBrokerEvent
1923
{
2024
/// <summary>
21-
/// Key of updated collection
25+
/// Key of updated collection (for a CollectionUpdated event)
2226
/// </summary>
23-
internal readonly byte[] Key;
27+
[FieldOffset(0)]
28+
internal byte[] Key = null;
2429

2530
/// <summary>
26-
/// Observers
31+
/// The keys that the observer requests to subscribe on (for a NewObserver event)
2732
/// </summary>
28-
internal readonly ConcurrentQueue<CollectionItemObserver> Observers;
33+
[FieldOffset(0)]
34+
internal byte[][] Keys = null;
2935

30-
public CollectionUpdatedEvent(byte[] key, ConcurrentQueue<CollectionItemObserver> observers)
36+
/// <summary>
37+
/// The new observer instance (for a NewObserver event)
38+
/// </summary>
39+
[FieldOffset(8)]
40+
internal CollectionItemObserver Observer = null;
41+
42+
/// <summary>
43+
/// The type of event represented
44+
/// </summary>
45+
[FieldOffset(16)]
46+
internal CollectionItemBrokerEventType EventType = CollectionItemBrokerEventType.NotSet;
47+
48+
public CollectionItemBrokerEvent()
3149
{
32-
Key = key;
33-
Observers = observers;
50+
3451
}
35-
}
3652

37-
/// <summary>
38-
/// Event to notify CollectionItemBroker that a new observer was created
39-
/// </summary>
40-
internal class NewObserverEvent : BrokerEventBase
41-
{
4253
/// <summary>
43-
/// The new observer instance
54+
/// Creates a CollectionUpdated event
4455
/// </summary>
45-
internal CollectionItemObserver Observer { get; }
56+
/// <param name="key">Key of updated collection</param>
57+
public static CollectionItemBrokerEvent CreateCollectionUpdatedEvent(byte[] key)
58+
{
59+
return new CollectionItemBrokerEvent
60+
{
61+
EventType = CollectionItemBrokerEventType.CollectionUpdated,
62+
Key = key
63+
};
64+
}
4665

4766
/// <summary>
48-
/// The keys that the observer requests to subscribe on
67+
/// Creates a NewObserver event
4968
/// </summary>
50-
internal byte[][] Keys { get; }
51-
52-
internal NewObserverEvent(CollectionItemObserver observer, byte[][] keys)
69+
/// <param name="observer">The new observer instance</param>
70+
/// <param name="keys">The keys that the observer requests to subscribe on</param>
71+
public static CollectionItemBrokerEvent CreateNewObserverEvent(CollectionItemObserver observer, byte[][] keys)
5372
{
54-
Observer = observer;
55-
Keys = keys;
73+
return new CollectionItemBrokerEvent
74+
{
75+
EventType = CollectionItemBrokerEventType.NewObserver,
76+
Observer = observer,
77+
Keys = keys,
78+
};
5679
}
80+
81+
public bool IsDefault() => EventType == CollectionItemBrokerEventType.NotSet;
5782
}
5883
}

libs/server/Objects/ItemBroker/CollectionItemObserver.cs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license.
33

44
using System.Threading;
5+
using Garnet.common;
56

67
namespace Garnet.server
78
{
@@ -38,7 +39,7 @@ internal class CollectionItemObserver
3839
/// <summary>
3940
/// Lock for the status of the observer
4041
/// </summary>
41-
internal ReaderWriterLockSlim ObserverStatusLock { get; } = new();
42+
internal SingleWriterMultiReaderLock ObserverStatusLock;
4243

4344
/// <summary>
4445
/// Semaphore to notify the ResultSet status
@@ -62,14 +63,16 @@ internal CollectionItemObserver(RespServerSession session, RespCommand command,
6263
/// Safely set the result for the observer
6364
/// </summary>
6465
/// <param name="result"></param>
65-
internal void HandleSetResult(CollectionItemResult result)
66+
/// <param name="isWriteLocked">True if the ObserverStatusLock was write locked by the caller</param>
67+
internal void HandleSetResult(CollectionItemResult result, bool isWriteLocked = false)
6668
{
6769
// If the result is already set or the observer session is disposed
6870
// There is no need to set the result
6971
if (Status != ObserverStatus.WaitingForResult)
7072
return;
7173

72-
ObserverStatusLock.EnterWriteLock();
74+
if (!isWriteLocked)
75+
ObserverStatusLock.WriteLock();
7376
try
7477
{
7578
if (Status != ObserverStatus.WaitingForResult)
@@ -82,7 +85,8 @@ internal void HandleSetResult(CollectionItemResult result)
8285
}
8386
finally
8487
{
85-
ObserverStatusLock.ExitWriteLock();
88+
if (!isWriteLocked)
89+
ObserverStatusLock.WriteUnlock();
8690
}
8791
}
8892

@@ -93,7 +97,7 @@ internal bool TryForceUnblock(bool throwError = false)
9397
if (Status != ObserverStatus.WaitingForResult)
9498
return false;
9599

96-
ObserverStatusLock.EnterWriteLock();
100+
ObserverStatusLock.WriteLock();
97101
try
98102
{
99103
if (Status != ObserverStatus.WaitingForResult)
@@ -107,7 +111,7 @@ internal bool TryForceUnblock(bool throwError = false)
107111
}
108112
finally
109113
{
110-
ObserverStatusLock.ExitWriteLock();
114+
ObserverStatusLock.WriteUnlock();
111115
}
112116
}
113117

@@ -116,15 +120,15 @@ internal bool TryForceUnblock(bool throwError = false)
116120
/// </summary>
117121
internal void HandleSessionDisposed()
118122
{
119-
ObserverStatusLock.EnterWriteLock();
123+
ObserverStatusLock.WriteLock();
120124
try
121125
{
122126
Status = ObserverStatus.SessionDisposed;
123127
CancellationTokenSource.Cancel();
124128
}
125129
finally
126130
{
127-
ObserverStatusLock.ExitWriteLock();
131+
ObserverStatusLock.WriteUnlock();
128132
}
129133
}
130134
}

libs/server/Objects/ItemBroker/CollectionItemResult.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
3434
Items = items;
3535
}
3636

37-
private CollectionItemResult(bool isForceUnblocked)
37+
private CollectionItemResult(bool isForceUnblocked, bool isTypeMismatch)
3838
{
3939
IsForceUnblocked = isForceUnblocked;
40+
IsTypeMismatch = isTypeMismatch;
4041
}
4142

4243
/// <summary>
@@ -72,16 +73,26 @@ private CollectionItemResult(bool isForceUnblocked)
7273
/// <summary>
7374
/// Gets a value indicating whether the item retrieval was force unblocked.
7475
/// </summary>
75-
internal readonly bool IsForceUnblocked { get; }
76+
internal bool IsForceUnblocked { get; }
77+
78+
/// <summary>
79+
/// Gets a value indicating whether the item retrieval returned a type mismatch.
80+
/// </summary>
81+
internal bool IsTypeMismatch { get; }
7682

7783
/// <summary>
7884
/// Instance of empty result
7985
/// </summary>
8086
internal static readonly CollectionItemResult Empty = new(null, item: null);
8187

8288
/// <summary>
83-
/// Instance representing an Force Unblocked result.
89+
/// Instance representing a Force Unblocked result.
90+
/// </summary>
91+
internal static readonly CollectionItemResult ForceUnblocked = new(isForceUnblocked: true, isTypeMismatch: false);
92+
93+
/// <summary>
94+
/// Instance representing a Type Mismatch result.
8495
/// </summary>
85-
internal static readonly CollectionItemResult ForceUnblocked = new(true);
96+
internal static readonly CollectionItemResult TypeMismatch = new(isForceUnblocked: false, isTypeMismatch: true);
8697
}
8798
}

libs/server/Resp/Objects/ListCommands.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,13 @@ private bool ListBlockingPop(RespCommand command)
325325
return true;
326326
}
327327

328+
if (result.IsTypeMismatch)
329+
{
330+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
331+
SendAndReset();
332+
return true;
333+
}
334+
328335
if (!result.Found)
329336
{
330337
while (!RespWriteUtils.TryWriteNullArray(ref dcurr, dend))
@@ -419,6 +426,20 @@ private bool ListBlockingMove(ArgSlice srcKey, ArgSlice dstKey, ArgSlice srcDir,
419426
var result = storeWrapper.itemBroker.MoveCollectionItemAsync(RespCommand.BLMOVE, srcKey.ToArray(), this, timeout,
420427
cmdArgs).Result;
421428

429+
if (result.IsForceUnblocked)
430+
{
431+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
432+
SendAndReset();
433+
return true;
434+
}
435+
436+
if (result.IsTypeMismatch)
437+
{
438+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
439+
SendAndReset();
440+
return true;
441+
}
442+
422443
if (!result.Found)
423444
{
424445
WriteNull();
@@ -995,6 +1016,13 @@ private unsafe bool ListBlockingPopMultiple()
9951016
SendAndReset();
9961017
}
9971018

1019+
if (result.IsTypeMismatch)
1020+
{
1021+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
1022+
SendAndReset();
1023+
return true;
1024+
}
1025+
9981026
if (!result.Found)
9991027
{
10001028
WriteNull();

libs/server/Resp/Objects/SortedSetCommands.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,20 @@ private unsafe bool SortedSetBlockingPop(RespCommand command)
15651565

15661566
var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result;
15671567

1568+
if (result.IsForceUnblocked)
1569+
{
1570+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
1571+
SendAndReset();
1572+
return true;
1573+
}
1574+
1575+
if (result.IsTypeMismatch)
1576+
{
1577+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
1578+
SendAndReset();
1579+
return true;
1580+
}
1581+
15681582
if (!result.Found)
15691583
{
15701584
WriteNull();
@@ -1668,6 +1682,20 @@ private unsafe bool SortedSetBlockingMPop()
16681682

16691683
var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BZMPOP, keysBytes, this, timeout, cmdArgs).Result;
16701684

1685+
if (result.IsForceUnblocked)
1686+
{
1687+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
1688+
SendAndReset();
1689+
return true;
1690+
}
1691+
1692+
if (result.IsTypeMismatch)
1693+
{
1694+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
1695+
SendAndReset();
1696+
return true;
1697+
}
1698+
16711699
if (!result.Found)
16721700
{
16731701
WriteNull();

0 commit comments

Comments
 (0)