Skip to content

Commit 1f0d677

Browse files
committed
Prevent maxNewRequests from blocking on async producers
Exact `requestsOverLimit` reporting for async iterables currently requires fully draining the producer after the budget is already exhausted. That turns a bounded enqueue operation into a blocking one when the producer stalls. This change keeps exact leftover reporting for materialized inputs (and for callers that explicitly opt into waiting), while letting the default async-iterable path return as soon as the budget is reached. Constraint: Preserve exact leftover reporting for arrays and for callers that explicitly set waitForAllRequestsToBeAdded Rejected: Remove requestsOverLimit entirely for every input type | too broad and breaks useful existing behavior for materialized sources Confidence: medium Scope-risk: moderate Reversibility: clean Directive: If the requestsOverLimit contract changes again, keep async-producer liveness and exact leftover reporting as separate concerns Tested: `node node_modules/vitest/vitest.mjs run test/core/crawlers/basic_crawler.test.ts -t 'addRequestsBatched with maxNewRequests should not wait for an async iterable beyond the remaining budget'`; `node node_modules/vitest/vitest.mjs run test/core/crawlers/basic_crawler.test.ts -t 'addRequestsBatched with maxNewRequests should correctly report requestsOverLimit for generator input'`; `node node_modules/vitest/vitest.mjs run test/core/crawlers/basic_crawler.test.ts -t 'addRequestsBatched with maxNewRequests should correctly report requestsOverLimit for array input'` Not-tested: Full test suite; behavior for custom iterable implementations beyond the covered array/async-generator cases
1 parent 1d4f6b9 commit 1f0d677

2 files changed

Lines changed: 54 additions & 8 deletions

File tree

packages/core/src/storages/request_provider.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,8 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
456456
}
457457

458458
const { batchSize = 1000, waitBetweenBatchesMillis = 1000, maxNewRequests } = options;
459+
const shouldCollectExactRequestsOverLimit =
460+
maxNewRequests !== undefined && (!isAsyncIterable(requests) || options.waitForAllRequestsToBeAdded);
459461

460462
let remainingBudget = maxNewRequests ?? Infinity;
461463
const requestsOverLimit: Source[] = [];
@@ -507,8 +509,9 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
507509
};
508510

509511
/**
510-
* Build the final result. When maxNewRequests is set, drains any remaining items
511-
* from the underlying request iterator into requestsOverLimit.
512+
* Build the final result. When maxNewRequests is set for a materialized source (or when the caller
513+
* explicitly asked to wait for all requests), drains any remaining items from the underlying request
514+
* iterator into requestsOverLimit.
512515
*
513516
* We accept the iterator explicitly (rather than closing over it) to make it obvious
514517
* that this is the *same* iterator that `chunkedAsyncIterable` has been consuming —
@@ -521,7 +524,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
521524
waitForAllRequestsToBeAdded: Promise<ProcessedRequest[]>,
522525
unconsumedIterator: AsyncGenerator<RequestOptions>,
523526
): Promise<AddRequestsBatchedResult> => {
524-
if (maxNewRequests !== undefined) {
527+
if (shouldCollectExactRequestsOverLimit) {
525528
for await (const request of unconsumedIterator) {
526529
requestsOverLimit.push(request);
527530
}
@@ -561,8 +564,8 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
561564
this.inProgressRequestBatchCount -= 1;
562565
});
563566

564-
// When maxNewRequests is set, we must wait for all batches so we can accurately report skipped requests.
565-
if (options.waitForAllRequestsToBeAdded || maxNewRequests !== undefined) {
567+
// When exact requestsOverLimit reporting is requested, we need to await the background work.
568+
if (options.waitForAllRequestsToBeAdded || shouldCollectExactRequestsOverLimit) {
566569
addedRequests.push(...(await promise));
567570
}
568571

@@ -1031,8 +1034,10 @@ export interface AddRequestsBatchedOptions extends RequestQueueOperationOptions
10311034
*
10321035
* This is useful in combination with `maxRequestsPerCrawl` to avoid duplicate URLs consuming the budget.
10331036
*
1034-
* **Note:** Setting this option implicitly enables {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`},
1035-
* since all batches must complete before leftover requests can be accurately reported.
1037+
* **Note:** For async iterables, exact `requestsOverLimit` reporting is only available when
1038+
* {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`}
1039+
* is set to `true`. Otherwise, Crawlee returns as soon as the budget is exhausted without draining
1040+
* the producer beyond that point.
10361041
*/
10371042
maxNewRequests?: number;
10381043
}
@@ -1061,6 +1066,10 @@ export interface AddRequestsBatchedResult {
10611066
* Requests from the input that were not added to the queue because the
10621067
* {@apilink AddRequestsBatchedOptions.maxNewRequests|`maxNewRequests`} budget was reached.
10631068
* Empty when `maxNewRequests` is not set.
1069+
*
1070+
* For async iterables, exact leftovers are only reported when
1071+
* {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`}
1072+
* is enabled.
10641073
*/
10651074
requestsOverLimit?: Source[];
10661075
}

test/core/crawlers/basic_crawler.test.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1947,7 +1947,10 @@ describe('BasicCrawler', () => {
19471947
yield { url: 'http://example.com/e' };
19481948
}
19491949

1950-
const result = await queue.addRequestsBatched(urls(), { maxNewRequests: 2 });
1950+
const result = await queue.addRequestsBatched(urls(), {
1951+
maxNewRequests: 2,
1952+
waitForAllRequestsToBeAdded: true,
1953+
});
19511954

19521955
const addedUrls = result.addedRequests.filter((r) => !r.wasAlreadyPresent).map((r) => r.uniqueKey);
19531956

@@ -1957,6 +1960,40 @@ describe('BasicCrawler', () => {
19571960
expect(overLimitUrls).toHaveLength(3);
19581961
});
19591962

1963+
test('addRequestsBatched with maxNewRequests should not wait for an async iterable beyond the remaining budget', async () => {
1964+
const queue = await RequestQueue.open();
1965+
let consumed = 0;
1966+
let releaseBlockedRequest = () => {};
1967+
const blockedRequest = new Promise<void>((resolve) => {
1968+
releaseBlockedRequest = resolve;
1969+
});
1970+
1971+
async function* urls() {
1972+
consumed += 1;
1973+
yield { url: 'http://example.com/a' };
1974+
1975+
consumed += 1;
1976+
await blockedRequest;
1977+
yield { url: 'http://example.com/b' };
1978+
}
1979+
1980+
const pendingResult = queue.addRequestsBatched(urls(), {
1981+
maxNewRequests: 1,
1982+
waitBetweenBatchesMillis: 0,
1983+
});
1984+
1985+
const raced = await Promise.race([
1986+
pendingResult.then(() => 'resolved'),
1987+
new Promise<'timeout'>((resolve) => setTimeout(() => resolve('timeout'), 100)),
1988+
]);
1989+
1990+
expect(raced).toBe('resolved');
1991+
expect(consumed).toBe(1);
1992+
1993+
releaseBlockedRequest();
1994+
await pendingResult;
1995+
});
1996+
19601997
test('should not count duplicate URLs toward maxRequestsPerCrawl limit (enqueueLinks)', async () => {
19611998
const requestQueue = await RequestQueue.open();
19621999

0 commit comments

Comments
 (0)