Skip to content

Commit afd35d5

Browse files
feat: add streamPreview API for live streaming output preview
## Problem `streamPreview` only handled `\n`-terminated lines. Programs like `curl` that use `\r` (carriage return) to update progress in-place were not displayed — the data stayed buffered and invisible. ## Changes - Handle `\r` in both completed lines and partial lines — keep content after last `\r` - Flush partial lines to display so `\r`-only updates render immediately - Include partial line content in flush output - Updated example to use `curl` and `ping` (real commands) - Added test: `handles carriage return for in-place updates` - Also includes sort-order fix (stacked on #33) Stacks on: pvtnbr/tasuku#33
1 parent d07755e commit afd35d5

10 files changed

Lines changed: 589 additions & 12 deletions

File tree

.github/media/stream-preview.gif

79.4 KB
Loading

.github/media/stream-preview.tape

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Output .github/media/stream-preview.gif
2+
Set Theme "Dracula"
3+
Set FontFamily "Menlo"
4+
Set FontSize 16
5+
Set Width 950
6+
Set Height 200
7+
Set Padding 20
8+
9+
Hide
10+
Type "node examples/stream-preview.js"
11+
Show
12+
Sleep 500ms
13+
Enter
14+
Sleep 8s

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ type TaskFunction = (taskInnerApi: {
257257
setOutput(output: string | { message: string }): void
258258
setWarning(warning: Error | string): void
259259
setError(error: Error | string): void
260+
streamPreview: Writable
260261
startTime(): void
261262
stopTime(): number
262263
}) => Promise<unknown>
@@ -281,6 +282,28 @@ Call with a string to set the output of the task.
281282
282283
<img src=".github/media/task-output.png">
283284
285+
#### streamPreview
286+
A `Writable` stream for displaying live output below the task. Pipe a child process or any readable stream into it to show a scrolling preview of the output.
287+
288+
Handles both `\n` (newline) and `\r` (carriage return) — programs like `curl` that use `\r` for in-place progress bars work out of the box.
289+
290+
```ts
291+
import { spawn } from 'node:child_process'
292+
import { pipeline } from 'node:stream/promises'
293+
294+
await task('Download', async ({ streamPreview }) => {
295+
const child = spawn('curl', ['-o', '/dev/null', 'https://example.com/file'])
296+
await pipeline(child.stderr, streamPreview)
297+
})
298+
```
299+
300+
<img src=".github/media/stream-preview.gif">
301+
302+
By default, shows the last 5 lines. Use the `previewLines` option to change this. When there are more lines than the limit, a `(+ N lines)` indicator is shown.
303+
304+
> [!NOTE]
305+
> `setOutput()` and `streamPreview` render independently. If both are used, static output appears above the stream preview.
306+
284307
#### setWarning()
285308
Call with a string or Error instance to put the task in a warning state.
286309

@@ -311,10 +334,17 @@ await task('Multi-phase', async ({ startTime, stopTime, setStatus }) => {
311334
```
312335

313336
#### options
314-
Type: `{ showTime?: boolean }`
337+
Type: `{ showTime?: boolean, previewLines?: number }`
315338

316339
Optional task options.
317340

341+
##### previewLines
342+
Type: `number`
343+
344+
Default: `5`
345+
346+
Maximum number of lines to display in the `streamPreview` output (minimum 1). When the stream produces more lines, older lines scroll off and a `(+ N lines)` indicator shows the total.
347+
318348
##### showTime
319349
When `true`, automatically starts the elapsed time counter when the task begins. Equivalent to calling `startTime()` at the start of the task function.
320350

examples/stream-preview.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { spawn } from 'node:child_process';
2+
import { pipeline } from 'node:stream/promises';
3+
import task from '#tasuku';
4+
5+
// Rate-limited to 2MB/s so progress is visible (~4 seconds for ~9MB file)
6+
await task('Download TypeScript', async ({ streamPreview }) => {
7+
const child = spawn('curl', [
8+
'--limit-rate',
9+
'2m',
10+
'-o',
11+
'/dev/null',
12+
'https://cdn.jsdelivr.net/npm/typescript@latest/lib/typescript.js',
13+
]);
14+
await pipeline(child.stderr, streamPreview);
15+
});

src/index.ts

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import { Writable } from 'node:stream';
12
import pMap from 'p-map';
3+
import stripAnsi from 'strip-ansi';
24
import { createRenderer, type Renderer } from './renderer.js';
35
import { reactive, setRenderCallback } from './reactive.js';
46
import {
@@ -16,7 +18,95 @@ import {
1618
runSymbol,
1719
} from './types.js';
1820

19-
const createTaskInnerApi = (taskState: TaskObject) => {
21+
const defaultPreviewLines = 5;
22+
23+
const createStreamPreview = (
24+
taskState: TaskObject,
25+
maxLines: number,
26+
): Writable => {
27+
const lines: string[] = [];
28+
let totalLines = 0;
29+
let partialLine = '';
30+
31+
// Resolve \r within a string: keep content after the last \r.
32+
// For trailing \r (nothing after), keep the last non-empty segment.
33+
const resolveCarriageReturn = (text: string) => {
34+
const segments = text.split('\r');
35+
return segments.reverse().find(Boolean) ?? '';
36+
};
37+
38+
const flush = () => {
39+
const displayPartial = partialLine.includes('\r')
40+
? resolveCarriageReturn(partialLine)
41+
: partialLine;
42+
const output = displayPartial
43+
? [...lines, displayPartial].join('\n')
44+
: lines.join('\n');
45+
taskState.streamOutput = output;
46+
taskState.streamTruncatedLines = Math.max(0, totalLines - maxLines);
47+
};
48+
49+
return new Writable({
50+
write(chunk: Buffer, _encoding, callback) {
51+
const text = stripAnsi(partialLine + chunk.toString());
52+
const parts = text.split(/\r?\n/);
53+
54+
// Last element is either empty (if chunk ended with \n) or a partial line
55+
partialLine = parts.pop()!;
56+
57+
for (const rawLine of parts) {
58+
// Handle \r (carriage return) — keep content after last \r
59+
const line = rawLine.includes('\r')
60+
? resolveCarriageReturn(rawLine)
61+
: rawLine;
62+
lines.push(line);
63+
totalLines += 1;
64+
if (lines.length > maxLines) {
65+
lines.shift();
66+
}
67+
}
68+
69+
// Trim accumulated \r segments to prevent unbounded growth
70+
if (partialLine.includes('\r')) {
71+
const resolved = resolveCarriageReturn(partialLine);
72+
// Keep trailing \r as boundary marker for next chunk
73+
partialLine = partialLine.endsWith('\r')
74+
? `${resolved}\r`
75+
: resolved;
76+
}
77+
78+
if (parts.length > 0 || partialLine) {
79+
flush();
80+
}
81+
82+
callback();
83+
},
84+
85+
final(callback) {
86+
// Flush any remaining partial line
87+
if (partialLine) {
88+
const line = partialLine.includes('\r')
89+
? resolveCarriageReturn(partialLine)
90+
: partialLine;
91+
lines.push(line);
92+
totalLines += 1;
93+
if (lines.length > maxLines) {
94+
lines.shift();
95+
}
96+
partialLine = '';
97+
flush();
98+
}
99+
callback();
100+
},
101+
});
102+
};
103+
104+
const createTaskInnerApi = (
105+
taskState: TaskObject,
106+
options?: TaskOptions,
107+
) => {
108+
let stream: Writable | undefined;
109+
20110
const api: TaskInnerAPI = {
21111
task: createTaskFunction(taskState.children),
22112
setTitle(title) {
@@ -36,6 +126,15 @@ const createTaskInnerApi = (taskState: TaskObject) => {
36126
)
37127
);
38128
},
129+
get streamPreview() {
130+
if (!stream) {
131+
stream = createStreamPreview(
132+
taskState,
133+
Math.max(1, Math.trunc(options?.previewLines ?? defaultPreviewLines)),
134+
);
135+
}
136+
return stream;
137+
},
39138
setWarning(warning) {
40139
taskState.state = 'warning';
41140

@@ -63,7 +162,10 @@ const createTaskInnerApi = (taskState: TaskObject) => {
63162
return taskState.elapsedMs;
64163
},
65164
};
66-
return api;
165+
return {
166+
api,
167+
destroyStream: () => stream?.destroy(),
168+
};
67169
};
68170

69171
let renderer: Renderer | undefined;
@@ -90,7 +192,7 @@ const registerTask = <T>(
90192
return {
91193
task,
92194
[runSymbol]: async () => {
93-
const api = createTaskInnerApi(task);
195+
const { api, destroyStream } = createTaskInnerApi(task, options);
94196

95197
task.state = 'loading';
96198

@@ -106,13 +208,15 @@ const registerTask = <T>(
106208
// Auto-stop timer on error
107209
api.stopTime();
108210
api.setError(error as Error);
211+
destroyStream();
109212
// Flush render before throwing to prevent overwriting subsequent output
110213
renderer?.flushRender();
111214
throw error;
112215
}
113216

114217
// Auto-stop timer on completion
115218
api.stopTime();
219+
destroyStream();
116220

117221
if (task.state === 'loading') {
118222
task.state = 'success';

src/renderer.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,35 @@ export const createRenderer = (
170170

171171
line += '\n';
172172

173-
if (task.output) {
174-
const outputIndent = `${indent} `;
175-
const styleText = (text: string) => (useColors
176-
? gray(text)
177-
: text);
173+
const outputIndent = `${indent} `;
174+
const styleText = (text: string) => (useColors
175+
? gray(text)
176+
: text);
178177

178+
// Static output: → prefix
179+
if (task.output) {
179180
line += `${task.output
180181
.split('\n')
181182
.map((outputLine, index) => `${outputIndent}${styleText(index === 0 ? `→ ${outputLine}` : outputLine)}`)
182183
.join('\n')}\n`;
183184
}
184185

186+
// Stream preview: ⎿ prefix with aligned continuation
187+
if (task.streamOutput) {
188+
const continuationIndent = `${outputIndent} `;
189+
line += `${task.streamOutput
190+
.split('\n')
191+
.map((outputLine, index) => (index === 0
192+
? `${outputIndent}${styleText(outputLine)}`
193+
: `${continuationIndent}${styleText(outputLine)}`))
194+
.join('\n')}\n`;
195+
196+
if (task.streamTruncatedLines) {
197+
const truncatedText = `(+ ${task.streamTruncatedLines} lines)`;
198+
line += `${continuationIndent}${styleText(truncatedText)}\n`;
199+
}
200+
}
201+
185202
// Render children recursively
186203
if (hasChildren) {
187204
line += renderTaskList(task.children, depth + 1);

src/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { Writable } from 'node:stream';
12
import type { Options as PMapOptions } from 'p-map';
23

34
type State = 'pending' | 'loading' | 'error' | 'warning' | 'success';
@@ -46,12 +47,15 @@ export type TaskObject = {
4647
children: TaskObject[];
4748
status?: string;
4849
output?: string;
50+
streamOutput?: string;
51+
streamTruncatedLines?: number;
4952
startedAt?: number;
5053
elapsedMs?: number;
5154
};
5255

5356
export type TaskOptions = {
5457
showTime?: boolean;
58+
previewLines?: number;
5559
};
5660

5761
export type TaskList = TaskObject[] & {
@@ -65,6 +69,7 @@ export type TaskInnerAPI = {
6569
setWarning(warning?: Error | string): void;
6670
setError(error?: Error | string): void;
6771
setOutput(output: string | { message: string }): void;
72+
streamPreview: Writable;
6873
startTime(): void;
6974
stopTime(): number;
7075
};

tests/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ await describe('tasuku', ({ runTestSuite }) => {
88
// Task behavior
99
runTestSuite(import('./specs/task-states.spec.js'));
1010
runTestSuite(import('./specs/elapsed-time.spec.js'));
11+
runTestSuite(import('./specs/stream-preview.spec.js'));
1112

1213
// Rendering
1314
runTestSuite(import('./specs/rendering.spec.js'));

tests/specs/max-visible.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ export default testSuite(({ describe }) => {
275275
);
276276
277277
console.log('TASKS_DONE');
278-
await setTimeout(200);
278+
await setTimeout(2000);
279279
`,
280280
}, { tempDir });
281281

@@ -322,7 +322,7 @@ export default testSuite(({ describe }) => {
322322
);
323323
324324
console.log('ALL_DONE');
325-
await setTimeout(200);
325+
await setTimeout(2000);
326326
`,
327327
}, { tempDir });
328328

@@ -379,7 +379,7 @@ export default testSuite(({ describe }) => {
379379
);
380380
381381
console.log('TASKS_DONE');
382-
await setTimeout(200);
382+
await setTimeout(2000);
383383
`,
384384
}, { tempDir });
385385

0 commit comments

Comments
 (0)