@@ -235,7 +235,7 @@ private void InitializeObserver(CollectionItemObserver observer, byte[][] keys)
235235 // If the key already has a non-empty observer queue, it does not have an item to retrieve
236236 // Otherwise, try to retrieve next available item
237237 if ( ( KeysToObservers . ContainsKey ( key ) && KeysToObservers [ key ] . Count > 0 ) ||
238- ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs ,
238+ ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs , true ,
239239 out _ , out var result ) ) continue ;
240240
241241 // An item was found - set the observer result and return
@@ -291,7 +291,7 @@ private bool TryAssignItemFromKey(byte[] key)
291291 }
292292
293293 // Try to get next available item from object stored in key
294- if ( ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs ,
294+ if ( ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs , false ,
295295 out var currCount , out var result ) )
296296 {
297297 // If unsuccessful getting next item but there is at least one item in the collection,
@@ -436,7 +436,9 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort
436436 }
437437 }
438438
439- private unsafe bool TryGetResult( byte [ ] key, StorageSession storageSession, RespCommand command, ArgSlice[ ] cmdArgs , out int currCount , out CollectionItemResult result )
439+ private unsafe bool TryGetResult( byte [ ] key, StorageSession storageSession,
440+ RespCommand command, ArgSlice[ ] cmdArgs , bool initial ,
441+ out int currCount , out CollectionItemResult result )
440442 {
441443 currCount = default ;
442444 result = default ;
@@ -461,26 +463,65 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
461463 Debug. Assert ( storageSession . txnManager . state == TxnState . None ) ;
462464 createTransaction = true;
463465 var asKey = storageSession. scratchBufferManager . CreateArgSlice ( key ) ;
466+ if ( initial )
467+ storageSession. txnManager . SaveKeyEntryToLock ( asKey , false , LockType . Exclusive ) ;
464468 storageSession. txnManager . SaveKeyEntryToLock ( asKey , true , LockType . Exclusive ) ;
465469
466470 if ( command == RespCommand . BLMOVE )
467471 {
472+ if ( initial )
473+ storageSession. txnManager . SaveKeyEntryToLock ( dstKey , false , LockType . Exclusive ) ;
468474 storageSession. txnManager . SaveKeyEntryToLock ( dstKey , true , LockType . Exclusive ) ;
469475 }
470476
471477 _ = storageSession. txnManager . Run ( true ) ;
472478 }
473479
474480 var objectLockableContext = storageSession. txnManager . ObjectStoreLockableContext ;
481+ IGarnetObject dstObj = null ;
482+ byte [ ] arrDstKey = default ;
475483
476484 try
477485 {
478486 // Get the object stored at key
479487 var statusOp = storageSession . GET ( key , out var osObject , ref objectLockableContext ) ;
480- if ( statusOp == GarnetStatus . NOTFOUND ) return false;
488+ if ( statusOp == GarnetStatus . NOTFOUND )
489+ {
490+ if ( ! initial )
491+ return false;
492+
493+ var context = storageSession . txnManager . LockableContext ;
494+
495+ var keySlice = storageSession . scratchBufferManager . CreateArgSlice ( key ) ;
496+ statusOp = storageSession . GET ( keySlice , out ArgSlice _ , ref context ) ;
497+
498+ if ( statusOp != GarnetStatus . NOTFOUND )
499+ {
500+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
501+ return true;
502+ }
503+
504+ if ( command == RespCommand . BLMOVE )
505+ {
506+ arrDstKey = dstKey . ToArray ( ) ;
507+ var dstStatusOp = storageSession . GET ( arrDstKey , out var osDstObject , ref objectLockableContext ) ;
508+ if ( dstStatusOp != GarnetStatus . NOTFOUND && osDstObject . GarnetObject is not ListObject )
509+ {
510+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
511+ return true;
512+ }
513+
514+ dstStatusOp = storageSession . GET ( dstKey , out ArgSlice _ , ref context ) ;
515+ if ( dstStatusOp != GarnetStatus . NOTFOUND )
516+ {
517+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
518+ return true;
519+ }
520+ }
521+
522+ return false;
523+ }
481524
482- IGarnetObject dstObj = null ;
483- byte [ ] arrDstKey = default ;
484525 if ( command == RespCommand . BLMOVE )
485526 {
486527 arrDstKey = dstKey. ToArray ( ) ;
@@ -494,8 +535,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
494535 {
495536 case ListObject listObj:
496537 currCount = listObj. LnkList . Count ;
497- if ( objectType != GarnetObjectType . List ) return false;
498- if ( currCount == 0 ) return false;
538+ if ( objectType != GarnetObjectType . List )
539+ {
540+ result = new CollectionItemResult( GarnetStatus . WRONGTYPE ) ;
541+ return initial;
542+ }
543+ if ( currCount == 0 )
544+ return false;
499545
500546 switch ( command )
501547 {
@@ -516,7 +562,11 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
516562 {
517563 dstList = tmpDstList ;
518564 }
519- else return false;
565+ else
566+ {
567+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
568+ return initial;
569+ }
520570
521571 isSuccessful = TryMoveNextListItem ( listObj , dstList , ( OperationDirection ) cmdArgs [ 1 ] . ReadOnlySpan [ 0 ] ,
522572 ( OperationDirection ) cmdArgs [ 2 ] . ReadOnlySpan [ 0 ] , out nextItem ) ;
@@ -528,7 +578,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
528578 GarnetStatus . OK ;
529579 }
530580
531- return isSuccessful ;
581+ return false ;
532582 case RespCommand. BLMPOP:
533583 var popDirection = ( OperationDirection ) cmdArgs[ 0 ] . ReadOnlySpan [ 0 ] ;
534584 var popCount = * ( int * ) ( cmdArgs [ 1 ] . ptr ) ;
@@ -543,19 +593,24 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
543593 result = new CollectionItemResult ( key , items ) ;
544594 return true ;
545595 default :
546- return false ;
596+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
597+ return initial ;
547598 }
548599 case SortedSetObject setObj :
549600 currCount = setObj. Count( ) ;
550601 if ( objectType ! = GarnetObjectType . SortedSet )
551- return false ;
602+ {
603+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
604+ return initial;
605+ }
552606 if ( currCount == 0 )
553607 return false ;
554608
555609 return TryGetNextSetObjects( key , setObj , currCount , command , cmdArgs , out result ) ;
556610
557611 default :
558- return false;
612+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
613+ return initial;
559614 }
560615 }
561616 finally
0 commit comments