[Refactor] Support concurrent inference acorss tasks.#2403
Conversation
b54e2fd to
b33b20e
Compare
b33b20e to
bce5683
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors OpenCompass inference/evaluation orchestration to enable concurrent inference across multiple datasets (tasks) and to allow evaluation to “watch” inference progress and start as soon as results are ready.
Changes:
- Added filesystem-based inference status tracking and a heartbeat mechanism for coordinating infer/eval across processes.
- Introduced
OpenICLInferTaskConcurrent(concurrent dataset inference) andOpenICLEvalWatchTask(evaluation that monitors infer progress). - Added parallel (per-sample) chat/gen inferencers and updated OpenAI API wrappers + CLI flow to support concurrency.
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| opencompass/utils/run.py | Minor formatting/line alignment change. |
| opencompass/utils/infer_status.py | New status manager with file-lock based read/write for infer progress. |
| opencompass/utils/heartbeat.py | New heartbeat writer/reader for eval-watch coordination. |
| opencompass/utils/init.py | Exposes HeartBeatManager / InferStatusManager from utils. |
| opencompass/tasks/openicl_infer_concurrent.py | New concurrent inference task that runs multiple datasets with shared API concurrency tokens. |
| opencompass/tasks/openicl_eval_watch.py | New eval task that waits for infer completion via status + heartbeat timeout. |
| opencompass/tasks/init.py | Registers the new tasks for discovery/import. |
| opencompass/openicl/icl_inferencer/icl_gen_inferencer_parallel.py | New per-sample parallel gen inferencer using a thread pool. |
| opencompass/openicl/icl_inferencer/icl_chat_inferencer_parallel.py | New per-sample parallel chat inferencer using a thread pool. |
| opencompass/openicl/icl_inferencer/init.py | Exports the new parallel inferencers. |
| opencompass/models/openai_streaming.py | Refactors streaming client creation to use a persistent OpenAI client and acquire/release gating. |
| opencompass/models/openai_api.py | Adds key-rotation lock and switches rate limiting to acquire/release for concurrency control. |
| opencompass/cli/main.py | Runs infer and eval-watch concurrently with a heartbeat thread when configured. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def safe_write(file: Path, content: str, work_dir: Path): | ||
| sig = '--'.join(file.resolve().relative_to(work_dir.resolve()).parts) | ||
| with SoftFileLock(work_dir / '.locks' / sig): | ||
| file.write_text(content) |
There was a problem hiding this comment.
safe_write also uses a lock file in work_dir / '.locks' without ensuring the parent directory exists. This can cause status writes to fail (and silently stall eval-watch logic). Create work_dir/.locks before taking the lock.
| # Load next dataset with 5% overhead. | ||
| max_pending_samples = max(1, int(max_workers * 1.05)) | ||
| remaining_total = self._remaining_total(running, max_pending_samples) | ||
| while pending and remaining_total < max_pending_samples: | ||
| dataset_cfg = pending.pop(0) | ||
| task_name = task_abbr_from_cfg({ |
There was a problem hiding this comment.
The dataset scheduling logic uses progress.remaining() (total samples left) to decide whether to start another dataset. Since remaining is typically the full dataset size (often >> max_pending_samples), this effectively prevents multiple datasets from running concurrently until the current dataset is nearly finished, defeating the purpose of “concurrent inference across tasks”. Consider tracking expected in-flight capacity per dataset (e.g., min(max_workers, remaining)), or simply bounding concurrent datasets explicitly (e.g., max_parallel_ds = ...) and removing the remaining_total gating.
| abbr_counts = {} | ||
| for dataset_cfg in pending: | ||
| abbr = dataset_cfg.get('abbr', 'task') | ||
| abbr_counts[abbr] = abbr_counts.get(abbr, 0) + 1 | ||
|
|
There was a problem hiding this comment.
abbr_counts is computed but never used. This adds noise and suggests incomplete logic; remove it or use it to disambiguate task names when there are duplicate dataset abbreviations.
| if self.orgs: | ||
| with Lock(): | ||
| self.org_ctr += 1 | ||
| if self.org_ctr == len(self.orgs): | ||
| self.org_ctr = 0 | ||
| header['OpenAI-Organization'] = self.orgs[self.org_ctr] |
There was a problem hiding this comment.
org_ctr rotation is guarded by with Lock():, but Lock() creates a new lock instance each call, so this provides no mutual exclusion across threads. Under concurrent inference this can corrupt org_ctr updates. Define a dedicated instance lock (e.g., self._org_lock = Lock() in __init__) and use it here.
| from opencompass.models.openai_api import OpenAISDK | ||
| from opencompass.registry import ICL_INFERENCERS | ||
| from opencompass.utils import batched |
There was a problem hiding this comment.
OpenAISDK is imported but not used in this module. Removing the unused import avoids unnecessary dependencies and keeps linting clean.
| def safe_read(file: Path, work_dir: Path): | ||
| sig = '--'.join(file.resolve().relative_to(work_dir.resolve()).parts) | ||
| with SoftFileLock(work_dir / '.locks' / sig): | ||
| content = file.read_text() | ||
| return content |
There was a problem hiding this comment.
safe_read acquires a lock under work_dir / '.locks', but this directory is never created. If .locks doesn't exist, SoftFileLock(...) will fail and status reads will break. Ensure work_dir/.locks is created (e.g., (...).mkdir(parents=True, exist_ok=True)) before acquiring the lock, or create it lazily inside safe_read.
| def __init__(self, work_dir: str | Path, fname: str = 'infer_heartbeat'): | ||
| self.hb_file = Path(work_dir) / fname | ||
| self._lock = SoftFileLock(Path(work_dir) / '.locks' / fname) | ||
|
|
||
| def start_heartbeat(self, write_interval: float = 5.): | ||
| Path(self.hb_file).parent.mkdir(exist_ok=True) | ||
| stop_event = threading.Event() |
There was a problem hiding this comment.
HeartBeatManager creates a lock at work_dir/.locks/<fname> but start_heartbeat() only creates the heartbeat file's parent dir (and not .locks, nor with parents=True). If .locks doesn't exist, lock acquisition will fail and the heartbeat will never be written, causing OpenICLEvalWatchTask to time out. Create work_dir/.locks (parents=True) before using SoftFileLock.
| self.openai_client = OpenAI( | ||
| base_url=self.openai_api_base, | ||
| api_key=current_key, | ||
| http_client=httpx.Client(**http_client_cfg, | ||
| timeout=httpx.Timeout(self.timeout)) | ||
| if http_client_cfg or True else None, | ||
| api_key=self.key, | ||
| http_client=http_client, | ||
| ) |
There was a problem hiding this comment.
OpenAISDKStreaming.__init__ builds an OpenAI client with api_key=self.key, but OpenAI/OpenAISDK store keys in self.keys and do not define self.key. This will raise AttributeError at runtime. Use the key argument (string), or select a key from self.keys (optionally with rotation) when constructing self.openai_client.
| def __init__( | ||
| self, | ||
| model, | ||
| max_out_len: int, | ||
| stopping_criteria: List[str] = [], | ||
| max_seq_len: Optional[int] = None, | ||
| min_out_len: Optional[int] = None, |
There was a problem hiding this comment.
stopping_criteria defaults to a mutable list ([]). If the list is ever mutated, instances can unintentionally share state. Use None as the default and replace with an empty list inside __init__.
12b6c25 to
40369f4
Compare
bfe45b1 to
881a00f
Compare
No description provided.