Skip to content

Commit 0fe4bc6

Browse files
TalZaccaivazois
andauthored
Multi-Database Support (#1005)
* Added MaxDatabases option * wip * wip * wip * wip * wip * wip * wip * wip * aof * wip * wip * wip * wip * wip * bugfixes * wip * tests * format * small fix * wip * test * simplify dispose logic for gcs at AofSyncTask * fix * format * fix * wip * test * readding multithreading tests * test * test * wip * test * format * wip * Added FLUSHALL + tests * Ignore LC MT test * format * Added DB ID to client info * SAVE, BGSAVE, LASTSAVE with ID - not tested yet * ensure waitForSync before dispose * wip * wip * wip * Adding HCOLLECT info & docs to GarnetCommandDocs/Info & updating RespCommandDocs/Info * Updated command info & docs for SAVE, BGSAVE, LASTSAVE & COMMITAOF * format * bugfixes * Adding SPUBLISH, SSUBSCRIBE TO GarnetCommandsInfo.json * wip * wip * wip * IDatabaseManager refactor - wip (broken) * wip (broken) * wip * wip * quit code change * wip * wip * Fixing SWAPDB * wip * wip * add explicit fail on fail to dispose * replayStoreWrapper should not create database * wip * wip - tests passing * wip * format * format * some merge fixes * format * bugfix * fix * Added ServerOperations to BDN + allocation improvements * Added website docs * Fixing ACL test * Initialize databaseManager as single database unless multi explicitly needed * merge fix + format * merging from main + removing unnecessary usings * bugfix * Test fix + consolidation of getting directory paths for checkpointing and AOF * Updating DB IDs after swap * Some cleanup * Some cleanup * more cleanup + format * small fixes * Fixed MaxDatabases validation * more cleanup * format + small fix * Calling select & swapdb from lua script * Disabling a flaky assert * Fix * Comment added * format * fix * Addressing comments + adding some tests * format * Added some more tests * fix * Avoiding allocations while acquiring databases lock * couple fixes * Added some website docs for multi-db * Switching GarnetDatabase & GarnetDatabaseSession from scruct to class * format * removed unnecessary benchmarks * Making properties in GD & GDS readonly * cleanup * Switching info to show current-db data, addressing some comments * format * Fix tests * fix * fix * disabling select & swapdb inside a txn for now, added tests. --------- Co-authored-by: Vasileios Zois <vazois@microsoft.com>
1 parent 594b6ab commit 0fe4bc6

66 files changed

Lines changed: 6041 additions & 1005 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

libs/cluster/Server/ClusterProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(Stor
440440
internal bool BumpAndWaitForEpochTransition()
441441
{
442442
BumpCurrentEpoch();
443-
foreach (var server in storeWrapper.TcpServer)
443+
foreach (var server in storeWrapper.Servers)
444444
{
445445
while (true)
446446
{

libs/cluster/Server/Migration/MigrateSessionKeys.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,14 @@ public bool MigrateKeys()
182182
return false;
183183

184184
// Migrate main store keys
185-
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy);
185+
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequency);
186186
if (!MigrateKeysFromMainStore())
187187
return false;
188188

189189
// Migrate object store keys
190190
if (!clusterProvider.serverOptions.DisableObjects)
191191
{
192-
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequncy);
192+
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequency);
193193
if (!MigrateKeysFromObjectStore())
194194
return false;
195195
}

libs/cluster/Session/RespClusterMigrateCommands.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ private void TrackImportProgress(int keyCount, bool isMainStore, bool completed
2626
{
2727
totalKeyCount += keyCount;
2828
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
29-
if (completed || lastLog == 0 || duration >= clusterProvider.storeWrapper.loggingFrequncy)
29+
if (completed || lastLog == 0 || duration >= clusterProvider.storeWrapper.loggingFrequency)
3030
{
3131
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount})", completed ? "COMPLETED" : "IMPORTING", isMainStore, totalKeyCount.ToString("N0"));
3232
lastLog = Stopwatch.GetTimestamp();

libs/cluster/Session/RespClusterReplicationCommands.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ private bool NetworkClusterFlushAll(out bool invalidParameters)
505505
}
506506

507507
// Flush all keys
508-
clusterProvider.storeWrapper.ExecuteFlushDb(RespCommand.FLUSHALL, false, 0);
508+
clusterProvider.storeWrapper.FlushAllDatabases(unsafeTruncateLog: false);
509509

510510
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
511511
SendAndReset();

libs/host/Configuration/Options.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,10 @@ public IEnumerable<string> LuaAllowedFunctions
607607
[Option("unixsocketperm", Required = false, HelpText = "Unix socket permissions in octal (Unix platforms only)")]
608608
public int UnixSocketPermission { get; set; }
609609

610+
[IntRangeValidation(1, 256, isRequired: true)]
611+
[Option("max-databases", Required = false, HelpText = "Max number of logical databases allowed in a single Garnet server instance")]
612+
public int MaxDatabases { get; set; }
613+
610614
/// <summary>
611615
/// This property contains all arguments that were not parsed by the command line argument parser
612616
/// </summary>
@@ -872,7 +876,8 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
872876
SkipRDBRestoreChecksumValidation = SkipRDBRestoreChecksumValidation.GetValueOrDefault(),
873877
LuaOptions = EnableLua.GetValueOrDefault() ? new LuaOptions(LuaMemoryManagementMode, LuaScriptMemoryLimit, LuaScriptTimeoutMs == 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromMilliseconds(LuaScriptTimeoutMs), LuaLoggingMode, LuaAllowedFunctions, logger) : null,
874878
UnixSocketPath = UnixSocketPath,
875-
UnixSocketPermission = unixSocketPermissions
879+
UnixSocketPermission = unixSocketPermissions,
880+
MaxDatabases = MaxDatabases,
876881
};
877882
}
878883

libs/host/Configuration/Redis/RedisOptions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ internal class RedisOptions
9797

9898
[RedisOption("slowlog-max-len", nameof(Options.SlowLogMaxEntries))]
9999
public Option<int> SlowLogMaxLen { get; set; }
100+
101+
[RedisOption("databases", nameof(Options.MaxDatabases))]
102+
public Option<int> Databases { get; set; }
100103
}
101104

102105
/// <summary>

libs/host/GarnetServer.cs

Lines changed: 90 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,6 @@ static string GetVersion()
4747

4848
private readonly GarnetServerOptions opts;
4949
private IGarnetServer[] servers;
50-
private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> store;
51-
private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> objectStore;
52-
private IDevice aofDevice;
53-
private TsavoriteLog appendOnlyFile;
5450
private SubscribeBroker subscribeBroker;
5551
private KVSettings<SpanByte, SpanByte> kvSettings;
5652
private KVSettings<byte[], IGarnetObject> objKvSettings;
@@ -233,24 +229,14 @@ private void InitializeServer()
233229
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");
234230

235231
opts.Initialize(loggerFactory);
236-
CreateMainStore(clusterFactory, out var checkpointDir);
237-
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);
232+
StoreWrapper.DatabaseCreatorDelegate createDatabaseDelegate = (int dbId) =>
233+
CreateDatabase(dbId, opts, clusterFactory, customCommandManager);
238234

239235
if (!opts.DisablePubSub)
240236
subscribeBroker = new SubscribeBroker(null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true, logger);
241237

242-
CreateAOF();
243-
244238
logger?.LogTrace("TLS is {tlsEnabled}", opts.TlsOptions == null ? "disabled" : "enabled");
245239

246-
if (logger != null)
247-
{
248-
var configMemoryLimit = (store.IndexSize * 64) + store.Log.MaxMemorySizeBytes + (store.ReadCache?.MaxMemorySizeBytes ?? 0) + (appendOnlyFile?.MaxMemorySizeBytes ?? 0);
249-
if (objectStore != null)
250-
configMemoryLimit += objectStore.IndexSize * 64 + objectStore.Log.MaxMemorySizeBytes + (objectStore.ReadCache?.MaxMemorySizeBytes ?? 0) + (objectStoreSizeTracker?.TargetSize ?? 0) + (objectStoreSizeTracker?.ReadCacheTargetSize ?? 0);
251-
logger.LogInformation("Total configured memory limit: {configMemoryLimit}", configMemoryLimit);
252-
}
253-
254240
// Create Garnet TCP server if none was provided.
255241
if (servers == null)
256242
{
@@ -268,8 +254,28 @@ private void InitializeServer()
268254
}
269255
}
270256

271-
storeWrapper = new StoreWrapper(version, RedisProtocolVersion, servers, store, objectStore, objectStoreSizeTracker,
272-
customCommandManager, appendOnlyFile, opts, subscribeBroker, clusterFactory: clusterFactory, loggerFactory: loggerFactory);
257+
storeWrapper = new StoreWrapper(version, RedisProtocolVersion, servers, customCommandManager, opts, subscribeBroker,
258+
createDatabaseDelegate: createDatabaseDelegate,
259+
clusterFactory: clusterFactory,
260+
loggerFactory: loggerFactory);
261+
262+
if (logger != null)
263+
{
264+
var configMemoryLimit = (storeWrapper.store.IndexSize * 64) +
265+
storeWrapper.store.Log.MaxMemorySizeBytes +
266+
(storeWrapper.store.ReadCache?.MaxMemorySizeBytes ?? 0) +
267+
(storeWrapper.appendOnlyFile?.MaxMemorySizeBytes ?? 0);
268+
if (storeWrapper.objectStore != null)
269+
configMemoryLimit += (storeWrapper.objectStore.IndexSize * 64) +
270+
storeWrapper.objectStore.Log.MaxMemorySizeBytes +
271+
(storeWrapper.objectStore.ReadCache?.MaxMemorySizeBytes ?? 0) +
272+
(storeWrapper.objectStoreSizeTracker?.TargetSize ?? 0) +
273+
(storeWrapper.objectStoreSizeTracker?.ReadCacheTargetSize ?? 0);
274+
logger.LogInformation("Total configured memory limit: {configMemoryLimit}", configMemoryLimit);
275+
}
276+
277+
var maxDatabases = opts.EnableCluster ? 1 : opts.MaxDatabases;
278+
logger?.LogInformation("Max number of logical databases allowed on server: {maxDatabases}", maxDatabases);
273279

274280
// Create session provider for Garnet
275281
Provider = new GarnetProvider(storeWrapper, subscribeBroker);
@@ -285,6 +291,17 @@ private void InitializeServer()
285291
LoadModules(customCommandManager);
286292
}
287293

294+
private GarnetDatabase CreateDatabase(int dbId, GarnetServerOptions serverOptions, ClusterFactory clusterFactory,
295+
CustomCommandManager customCommandManager)
296+
{
297+
var store = CreateMainStore(dbId, clusterFactory, out var epoch, out var stateMachineDriver);
298+
var objectStore = CreateObjectStore(dbId, clusterFactory, customCommandManager, epoch, stateMachineDriver, out var objectStoreSizeTracker);
299+
var (aofDevice, aof) = CreateAOF(dbId);
300+
return new GarnetDatabase(dbId, store, objectStore, epoch, stateMachineDriver, objectStoreSizeTracker,
301+
aofDevice, aof, serverOptions.AdjustedIndexMaxCacheLines == 0,
302+
serverOptions.AdjustedObjectStoreIndexMaxCacheLines == 0);
303+
}
304+
288305
private void LoadModules(CustomCommandManager customCommandManager)
289306
{
290307
if (opts.LoadModuleCS == null)
@@ -309,82 +326,82 @@ private void LoadModules(CustomCommandManager customCommandManager)
309326
}
310327
}
311328

312-
private void CreateMainStore(IClusterFactory clusterFactory, out string checkpointDir)
329+
private TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator> CreateMainStore(int dbId, IClusterFactory clusterFactory,
330+
out LightEpoch epoch, out StateMachineDriver stateMachineDriver)
313331
{
314-
kvSettings = opts.GetSettings(loggerFactory, out logFactory);
332+
epoch = new LightEpoch();
333+
stateMachineDriver = new StateMachineDriver(epoch, loggerFactory?.CreateLogger($"StateMachineDriver"));
315334

316-
checkpointDir = opts.CheckpointDir ?? opts.LogDir;
335+
kvSettings = opts.GetSettings(loggerFactory, epoch, stateMachineDriver, out logFactory);
317336

318337
// Run checkpoint on its own thread to control p99
319338
kvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
320339

321-
if (opts.EnableCluster)
322-
{
323-
kvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator,
324-
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), isMainStore: true, logger);
325-
}
326-
else
327-
{
328-
kvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
329-
new DefaultCheckpointNamingScheme(checkpointDir + "/Store/checkpoints"), removeOutdated: true);
330-
}
340+
var baseName = opts.GetMainStoreCheckpointDirectory(dbId);
341+
var defaultNamingScheme = new DefaultCheckpointNamingScheme(baseName);
342+
343+
kvSettings.CheckpointManager = opts.EnableCluster ?
344+
clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, isMainStore: true, logger) :
345+
new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, removeOutdated: true);
331346

332-
store = new(kvSettings
347+
return new TsavoriteKV<SpanByte, SpanByte, MainStoreFunctions, MainStoreAllocator>(kvSettings
333348
, StoreFunctions<SpanByte, SpanByte>.Create()
334349
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
335350
}
336351

337-
private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandManager customCommandManager, string CheckpointDir, out CacheSizeTracker objectStoreSizeTracker)
352+
private TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator> CreateObjectStore(int dbId, IClusterFactory clusterFactory, CustomCommandManager customCommandManager,
353+
LightEpoch epoch, StateMachineDriver stateMachineDriver, out CacheSizeTracker objectStoreSizeTracker)
338354
{
339355
objectStoreSizeTracker = null;
340-
if (!opts.DisableObjects)
341-
{
342-
objKvSettings = opts.GetObjectStoreSettings(loggerFactory,
343-
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);
356+
if (opts.DisableObjects)
357+
return null;
344358

345-
// Run checkpoint on its own thread to control p99
346-
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
359+
objKvSettings = opts.GetObjectStoreSettings(loggerFactory, epoch, stateMachineDriver,
360+
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);
361+
362+
// Run checkpoint on its own thread to control p99
363+
objKvSettings.ThrottleCheckpointFlushDelayMs = opts.CheckpointThrottleFlushDelayMs;
364+
365+
var baseName = opts.GetObjectStoreCheckpointDirectory(dbId);
366+
var defaultNamingScheme = new DefaultCheckpointNamingScheme(baseName);
367+
368+
objKvSettings.CheckpointManager = opts.EnableCluster ?
369+
clusterFactory.CreateCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, isMainStore: false, logger) :
370+
new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator, defaultNamingScheme, removeOutdated: true);
371+
372+
var objStore = new TsavoriteKV<byte[], IGarnetObject, ObjectStoreFunctions, ObjectStoreAllocator>(
373+
objKvSettings,
374+
StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(),
375+
() => new ByteArrayBinaryObjectSerializer(),
376+
() => new GarnetObjectSerializer(customCommandManager)),
377+
(allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
378+
379+
if (objHeapMemorySize > 0 || objReadCacheHeapMemorySize > 0)
380+
objectStoreSizeTracker = new CacheSizeTracker(objStore, objKvSettings, objHeapMemorySize, objReadCacheHeapMemorySize,
381+
this.loggerFactory);
382+
383+
return objStore;
347384

348-
if (opts.EnableCluster)
349-
objKvSettings.CheckpointManager = clusterFactory.CreateCheckpointManager(
350-
opts.DeviceFactoryCreator,
351-
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
352-
isMainStore: false, logger);
353-
else
354-
objKvSettings.CheckpointManager = new DeviceLogCommitCheckpointManager(opts.DeviceFactoryCreator,
355-
new DefaultCheckpointNamingScheme(CheckpointDir + "/ObjectStore/checkpoints"),
356-
removeOutdated: true);
357-
358-
objectStore = new(objKvSettings
359-
, StoreFunctions<byte[], IGarnetObject>.Create(new ByteArrayKeyComparer(),
360-
() => new ByteArrayBinaryObjectSerializer(),
361-
() => new GarnetObjectSerializer(customCommandManager))
362-
, (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions));
363-
364-
if (objHeapMemorySize > 0 || objReadCacheHeapMemorySize > 0)
365-
objectStoreSizeTracker = new CacheSizeTracker(objectStore, objKvSettings, objHeapMemorySize, objReadCacheHeapMemorySize,
366-
this.loggerFactory);
367-
}
368385
}
369386

370-
private void CreateAOF()
387+
private (IDevice, TsavoriteLog) CreateAOF(int dbId)
371388
{
372-
if (opts.EnableAOF)
389+
if (!opts.EnableAOF)
373390
{
374-
if (opts.FastAofTruncate && opts.CommitFrequencyMs != -1)
375-
throw new Exception("Need to set CommitFrequencyMs to -1 (manual commits) with MainMemoryReplication");
376-
377-
opts.GetAofSettings(out var aofSettings);
378-
aofDevice = aofSettings.LogDevice;
379-
appendOnlyFile = new TsavoriteLog(aofSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteLog [aof]"));
380-
381-
if (opts.CommitFrequencyMs < 0 && opts.WaitForCommit)
382-
throw new Exception("Cannot use CommitWait with manual commits");
383-
return;
391+
if (opts.CommitFrequencyMs != 0 || opts.WaitForCommit)
392+
throw new Exception("Cannot use CommitFrequencyMs or CommitWait without EnableAOF");
393+
return (null, null);
384394
}
385395

386-
if (opts.CommitFrequencyMs != 0 || opts.WaitForCommit)
387-
throw new Exception("Cannot use CommitFrequencyMs or CommitWait without EnableAOF");
396+
if (opts.FastAofTruncate && opts.CommitFrequencyMs != -1)
397+
throw new Exception("Need to set CommitFrequencyMs to -1 (manual commits) with FastAofTruncate");
398+
399+
opts.GetAofSettings(dbId, out var aofSettings);
400+
var aofDevice = aofSettings.LogDevice;
401+
var appendOnlyFile = new TsavoriteLog(aofSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteLog [aof]"));
402+
if (opts.CommitFrequencyMs < 0 && opts.WaitForCommit)
403+
throw new Exception("Cannot use CommitWait with manual commits");
404+
return (aofDevice, appendOnlyFile);
388405
}
389406

390407
/// <summary>
@@ -432,13 +449,9 @@ private void InternalDispose()
432449
for (var i = 0; i < servers.Length; i++)
433450
servers[i]?.Dispose();
434451
subscribeBroker?.Dispose();
435-
store.Dispose();
436-
appendOnlyFile?.Dispose();
437-
aofDevice?.Dispose();
438452
kvSettings.LogDevice?.Dispose();
439453
if (!opts.DisableObjects)
440454
{
441-
objectStore.Dispose();
442455
objKvSettings.LogDevice?.Dispose();
443456
objKvSettings.ObjectLogDevice?.Dispose();
444457
}

libs/host/defaults.conf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,5 +399,8 @@
399399
"UnixSocketPath": null,
400400

401401
/* Unix socket permissions in octal (Unix platforms only) */
402-
"UnixSocketPermission": 0
402+
"UnixSocketPermission": 0,
403+
404+
/* Max number of logical databases allowed in a single Garnet server instance */
405+
"MaxDatabases": 16
403406
}

0 commit comments

Comments
 (0)