Advanced Streaming Patterns
Streaming is the foundation of responsive AI applications. In this lesson you will go beyond basic streaming to master the Web Streams API, implement backpressure handling, and integrate streaming seamlessly with CoFounder's useAgent hook.
Understanding the ReadableStream API
The Web Streams API gives you fine-grained control over how data flows from your LLM provider to the browser. CoFounder's core package exposes raw ReadableStream instances so you can compose custom pipelines. A ReadableStream consists of an underlying source that produces chunks and a controller that manages the queue.
import { createAgent } from '@waymakerai/aicofounder-core';
const agent = createAgent({
model: 'gpt-4o',
streaming: true,
});
// Get the raw ReadableStream from a completion
const stream = await agent.stream('Explain quantum computing');
const reader = stream.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
process.stdout.write(text);
}The key insight is that reader.read() returns a promise that resolves only when data is available. This natural backpressure means your consumer never gets overwhelmed. If your processing is slow, the stream automatically pauses until you call read() again.
TransformStreams for Data Processing
TransformStreams sit between a ReadableStream and a WritableStream, letting you modify chunks in flight. This is ideal for parsing SSE data, extracting tool calls, or transforming tokens before they reach the UI.
// A TransformStream that extracts JSON tool calls from the stream
function createToolCallExtractor() {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
const text = new TextDecoder().decode(chunk);
buffer += text;
// Check for complete tool call JSON
const toolCallRegex = /\{\{tool:(.*?)\}\}/g;
let match;
while ((match = toolCallRegex.exec(buffer)) !== null) {
const toolCall = JSON.parse(match[1]);
controller.enqueue({
type: 'tool_call',
data: toolCall,
});
buffer = buffer.slice(match.index + match[0].length);
}
// Pass through regular text
if (buffer.length > 0 && !buffer.includes('{{tool:')) {
controller.enqueue({ type: 'text', data: buffer });
buffer = '';
}
},
flush(controller) {
if (buffer.length > 0) {
controller.enqueue({ type: 'text', data: buffer });
}
},
});
}Backpressure Handling
Backpressure occurs when the consumer cannot keep up with the producer. Without proper handling, memory grows unboundedly. The Streams API has built-in backpressure via the internal queue's high water mark, but you need to be aware of it when building custom pipelines.
CoFounder provides a BufferedStream utility that lets you set explicit watermarks and get notified when the queue is filling up. This is critical when streaming to multiple clients simultaneously or when your UI rendering is expensive.
import { createAgent, BufferedStream } from '@waymakerai/aicofounder-core';
const agent = createAgent({ model: 'gpt-4o', streaming: true });
// Create a buffered stream with a high water mark of 10 chunks
const buffered = new BufferedStream({
highWaterMark: 10,
onPressure: (queueSize) => {
console.warn(`Stream queue at ${queueSize} chunks`);
},
});
const response = await agent.stream('Write a long essay');
const processed = response.pipeThrough(buffered);
// The useAgent hook handles this automatically
// but for custom implementations you control the flow
const reader = processed.getReader();
for await (const chunk of readerToAsyncIterable(reader)) {
await renderToDOM(chunk); // slow operation
// backpressure naturally applied
}Streaming with the useAgent Hook
CoFounder's useAgent hook abstracts away the stream management while still giving you control over the rendering lifecycle. Under the hood it uses a ReadableStream with automatic cancellation when the component unmounts.
The hook exposes onChunk, onComplete, and onError callbacks that let you process streaming data at each stage. The onChunk callback fires for each token, letting you implement typewriter effects, syntax highlighting, or progressive rendering.
Server-Sent Events Setup
For Next.js applications, Server-Sent Events provide a clean protocol for streaming from your API routes to the client. Unlike WebSockets, SSE works over standard HTTP, passes through proxies and CDNs without configuration, and automatically reconnects on failure.
In the next lesson we will take a deep dive into the SSE protocol and build a production-grade SSE endpoint. For now, understand that CoFounder's streaming layer can target either a raw ReadableStream (for server components and edge functions) or an SSE endpoint (for traditional client-server architectures).