Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions packages/mosaic/core/docs/latest-only-streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Latest-Only Stream Scheduling

## Problem
Interactive applications often generate high-frequency query streams (e.g., brushing, slider dragging). Sending every intermediate query to the server can cause:
1. **Backlog Latency:** The server queues up stale requests, delaying the "latest" relevant result.
2. **Wasted Work:** Computing results for chart states that the user has already passed.

## API
The `Coordinator.query()` method accepts two new options:

```typescript
coord.query(sql, {
stream: 'my-interaction-id', // Group requests by this ID
latest: true // Enable pruning of stale requests
});
```

## Semantics
When `latest: true` is set:
1. **Queue Pruning:** Any *pending* (queued but not yet sent) requests with the same `stream` ID are removed from the client-side queue.
2. **Stale Suppression:** If a request was already sent (inflight) and returns *after* a newer request was issued, its result is rejected with `"Stale"` to prevent UI jitter.

## Limitations
* **No Server Cancellation:** This feature does not interrupt queries already executing on the database. It only prevents new stale queries from being sent and suppresses stale results.
16 changes: 13 additions & 3 deletions packages/mosaic/core/src/Coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ export class Coordinator {
* @param options.cache If true, cache the query result client-side within the QueryManager.
* @param options.persist If true, request the database server to persist a cached query server-side.
* @param options.priority The query priority, defaults to `Priority.Normal`.
* @param options.stream Stream ID for grouping related requests (e.g., 'brush').
* @param options.latest If true, keeps only the most recent pending request for the stream ID. Older queued requests are pruned.
* @returns A query result promise.
*/
query(
Expand All @@ -175,6 +177,8 @@ export class Coordinator {
cache?: boolean;
persist?: boolean;
priority?: number;
stream?: string;
latest?: boolean;
[key: string]: unknown;
}
): QueryResult<Table>;
Expand All @@ -185,6 +189,8 @@ export class Coordinator {
cache?: boolean;
persist?: boolean;
priority?: number;
stream?: string;
latest?: boolean;
[key: string]: unknown;
}
): QueryResult<unknown>;
Expand All @@ -195,16 +201,20 @@ export class Coordinator {
cache?: boolean;
persist?: boolean;
priority?: number;
stream?: string;
latest?: boolean;
[key: string]: unknown;
} = {}
): QueryResult<any> {
const {
type = 'arrow',
cache = true,
priority = Priority.Normal,
stream,
latest,
...otherOptions
} = options;
return this.manager.request({ type, query, cache, options: otherOptions }, priority);
return this.manager.request({ type, query, cache, stream, latest, options: otherOptions }, priority);
}

/**
Expand All @@ -217,11 +227,11 @@ export class Coordinator {
*/
prefetch(
query: QueryType,
options?: { type?: 'arrow'; [key: string]: unknown }
options?: { type?: 'arrow';[key: string]: unknown }
): QueryResult<Table>
prefetch(
query: QueryType,
options?: { type?: 'json'; [key: string]: unknown }
options?: { type?: 'json';[key: string]: unknown }
): QueryResult<unknown>
prefetch(
query: QueryType,
Expand Down
30 changes: 30 additions & 0 deletions packages/mosaic/core/src/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class QueryManager {
public pendingResults: QueryResult[];
private maxConcurrentRequests: number;
private pendingExec: boolean;
private streamGen = new Map<string, number>();

constructor(maxConcurrentRequests: number = 32) {
this.queue = new PriorityQueue(3);
Expand Down Expand Up @@ -103,6 +104,14 @@ export class QueryManager {

const data = await promise;

if (request.stream && request.latest) {
if (request.streamGen !== this.streamGen.get(request.stream)) {
this._logger.debug('Suppressing stale result', { stream: request.stream, gen: request.streamGen });
result.reject('Stale');
return;
}
}

if (cache) this.clientCache!.set(sql!, data);

this._logger.debug(`Request: ${(performance.now() - t0).toFixed(1)}`);
Expand Down Expand Up @@ -179,6 +188,27 @@ export class QueryManager {
request(request: QueryRequest, priority: number = Priority.Normal): QueryResult {
const result = new QueryResult();
const entry = { request, result };

if (request.stream && request.latest) {
// Latest-Only Scheduling:
// 1. Increment generation counter for this stream.
// 2. Prune any older queued requests for the same stream.
// 3. Mark request with generationID so stale results can be suppressed.
// Note: Inflight requests on the server are NOT cancelled.
const gen = (this.streamGen.get(request.stream) ?? 0) + 1;
this.streamGen.set(request.stream, gen);
request.streamGen = gen;

this.queue.remove(entry => {
const req = entry.request;
if (req.stream === request.stream && req.latest) {
entry.result.reject('Stale');
return true;
}
return false;
});
}

if (this._consolidate) {
this._consolidate.add(entry, priority);
} else {
Expand Down
3 changes: 3 additions & 0 deletions packages/mosaic/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ export interface QueryRequest {
query: string | Query | DescribeQuery;
cache?: boolean;
options?: Record<string, unknown>;
stream?: string;
latest?: boolean;
streamGen?: number;
}

/** Type for an entry within a query manager. */
Expand Down
66 changes: 66 additions & 0 deletions packages/mosaic/core/test/querymanager-latest-only.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { describe, it, expect } from 'vitest';
import { QueryManager, Priority } from '../src/QueryManager.js';
import type { Connector } from '../src/connectors/Connector.js';

// Mock connector that counts how many times query() is actually invoked.
// Adds a small delay so queued requests build up and pruning can occur.
class CountingConnector implements Connector {
public calls: number = 0;

async query(req: any): Promise<any> {
this.calls += 1;
// Delay to ensure queue backlog exists (so pruning matters)
await new Promise(r => setTimeout(r, 50));

// Return something valid for json requests
if (req.type === 'json') return [{ ok: true }];
if (req.type === 'exec') return;
// Arrow is not needed here; if used, return empty
return [];
}
}

describe('QueryManager latest-only scheduling', () => {
it('prunes stale requests and executes only a small number of connector calls', async () => {
const connector = new CountingConnector();

// Concurrency = 1 to force a backlog
const qm = new QueryManager(1);
qm.connector(connector);

// Disable consolidation/caching to isolate latest-only behavior
qm.cache(false);
qm.consolidate(false);

const N = 30;

// Fire N requests quickly: same stream, latest-only.
// The correct implementation should reject stale ones and only fulfill the latest.
const reqs = Array.from({ length: N }, (_, i) => {
return qm.request(
{
type: 'json',
query: `SELECT ${i} AS qid`,
cache: false,
stream: 'ui',
latest: true
// streamGen: (only add if your implementation requires it; Step 13.1 will tell us)
},
Priority.Normal
);
});

const settled = await Promise.allSettled(reqs as any);

const fulfilled = settled.filter(x => x.status === 'fulfilled').length;
const rejected = settled.filter(x => x.status === 'rejected').length;

// Core correctness: only the newest should matter.
expect(fulfilled).toBe(1);
expect(rejected).toBe(N - 1);

// Efficiency: we must not hit the connector N times.
// Be strict but realistic: should be single-digit calls, not 30.
expect(connector.calls).toBeLessThan(10);
});
});