fix(infra): move data service seed to worker-level config#1210
Merged
Conversation
Contributor
There was a problem hiding this comment.
Code Review
This pull request refactors the random seeding mechanism in the data service, moving from per-dataset seeds to a global worker-level seed set during initialization. It also introduces a datasets_lock in the worker application to synchronize dataset management. Feedback was provided regarding the concurrency of the worker, specifically suggesting that the datasets_lock should not be held during the long-running dataset loading process to avoid blocking other management operations.
Set random seed once at worker startup instead of per-request during dataset load and epoch reset. This prevents seed re-initialization from interfering with data shuffling across multiple datasets. Key changes: - Add seed field to DataServiceConfig and DataWorkerConfig - Pass seed as CLI arg to worker process, set once in lifespan - Remove seed from WorkerLoadDatasetRequest and _DatasetState - Add datasets_lock for thread-safe dataset load/unload - Update all trainers to pass seed via DataServiceConfig
Prevent races between load/unload and stateful endpoints (fetch, reset, save, load) on the data service worker. Key changes: - Add _loading_ids reservation set so load_dataset does not hold datasets_lock across slow I/O (asyncio.to_thread) - Add unloading flag to _DatasetState; unload_dataset drains in-flight state ops via state.lock before dict removal - Introduce _locked_active_state context manager that checks the unloading flag; apply to fetch, reset, save, load - Add 4 deterministic concurrency regression tests covering duplicate-load rejection, unload drain, stale-fetch 409, and cross-dataset non-blocking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Move the random seed from a per-request parameter (set on each
load_datasetcall and re-set on epoch reset) to a worker-level configuration set once at startup. This prevents repeated seed re-initialization from interfering with data shuffling correctness when multiple datasets are loaded on the same worker. Also adds adatasets_lockto eliminate race conditions on concurrent dataset load/unload operations.Related Issue
Type of Change
Checklist
pre-commit run --all-files)./docs/build_all.sh)main/review-prcommand/create-prBreaking Change Details (if applicable):
N/A
Additional Context
Key changes:
DataServiceConfiggains aseedfield;from_dataset_config()now accepts seed from trainer config--seedCLI arg to worker subprocess, set once inlifespan()viaseeding.set_random_seedseedfromWorkerLoadDatasetRequest,_DatasetState, andRDataset.register()datasets_lock(asyncio.Lock) for atomic dataset load/unload — prevents TOCTOU race on the datasets dictPPOTrainer,RWTrainer,SFTTrainer) updated to pass seed throughDataServiceConfiginstead of per-datasetFiles changed:
areal/infra/data_service/controller/config.py— add seed field + from_dataset_config paramareal/infra/data_service/controller/controller.py— pass --seed arg, remove seed from load requestareal/infra/data_service/rdataset.py— remove seed param from register()areal/infra/data_service/types.py— remove seed from WorkerLoadDatasetRequestareal/infra/data_service/worker/__main__.py— add --seed CLI argareal/infra/data_service/worker/app.py— seed at startup, add datasets_lockareal/infra/data_service/worker/config.py— add seed fieldareal/trainer/rl_trainer.py— pass seed via DataServiceConfigareal/trainer/rw_trainer.py— pass seed via DataServiceConfigareal/trainer/sft_trainer.py— pass seed via DataServiceConfig