Skip to Content

Streaming

Streaming is important in AI applications to provide a good user experience. LLMs may take time to complete generating output, streaming allows users to start reading as the generation progresses.

In cascaide you can implement nodes that stream results back to the client. We will first discuss the approach taken for context.

Dealing With Competing API Standards

There are many LLM API providers out there and no single unified response shape. The APIs are genuinely different and switching them out is not as straightforward as it should be. Even within the same provider, there are multiple APIs — chat completions vs the responses API in OpenAI, or interactions API vs generateContentStream in Gemini — that differ wildly. Clearly, provider alone is not a sufficient identifier.

The most common method of dealing with this is to pick a canonical form of Messages and normalize different API responses to it. This comes with the benefit that the shapes are predictable for use elsewhere. However, it comes with tradeoffs:

  • When APIs change, things break and developers are forced to patch around it or wait for maintainers.
  • The fix might be tiny, but difficult to spot as the normalization logic lives inside the framework.
  • Every time you call the API you transform in and out of the canonical form.
  • These transformations may be lossy with silent data loss.

To avoid some of those drawbacks, cascaide itself does not know that some Message shape exists. No part of the packages expect a particular shape. In that case having a canonical shape becomes more a matter of convenience for the end developer.

The goal is to contain the blast radius when APIs change.

  • When APIs change, it should not break cascaide
  • Only message assembly in streaming and transformation helper functions should be affected
  • Developers should be able to quickly write their own assembly functions and swap until we support the new API
  • Taking it one step further, developers should be able to fully override the stream assembly and create their own canonical Message shapes
  • Doing so should be straightforward and easy

Nothing that ships with the packages expect a particular shape. Only helpers shipped separately expect the canonical shape, and these are kept deliberately small covering delegation, recursion, and agent creation.

How it works

When a node returns a StreamConfig from exec(), the framework passes it to handleProviderStream, which drives a loop over the raw provider stream. As chunks arrive, a mapper translates each raw chunk into one or more deltas — small, named pieces of data. Each delta carries an identity (the field it belongs to, e.g. "content", "thinking", "tool_calls") and a value.

Two parallel objects are assembled throughout the loop:

  • canonical — the full assembled message, always complete, never filtered. This is what gets persisted as LLM history.
  • uiMessage — only what passed through your filter, with any replacements applied. This is what the client actually sees.

Each delta is sent to the client as a { cascadeId, identity, value } chunk. On the client, streamChunkReceived assembles those chunks into the assistant message using the same blind-accumulation pattern: if the field is a string it appends, otherwise it replaces.

The pipeline in order:

provider stream └─> mapper (raw chunk → ChunkDelta[]) └─> canonical assembly (always) └─> filter (optional) └─> send to client └─> client: streamChunkReceived → assistant message

Concepts

Identity

Every piece of streamed data has an identity — a string key that names what the value represents. The built-in mappers emit these identities:

IdentityTypeDescription
rolestringAlways 'assistant'. Silent — never sent to the frontend.
contentstringThe text response. Streamed as deltas and appended.
thinkingstringExtended thinking / reasoning. Streamed as deltas.
tool_callsCanonicalToolCall[]Tool calls. Buffered and sent once complete.
extensionsobjectProvider-specific data (e.g. Gemini’s thoughtSignature). Silent and buffered.

Your filter and your post() handler both work with these identity names.

Canonical vs uiMessage

canonical is always complete. It includes thinking, extensions, and everything else regardless of what your filter suppresses. It is what you pass to assistantMessage in post() and what the framework writes to history.

uiMessage only exists when you provide a filter. It contains only what was actually sent to the client, with any replacements your filter applied. Pass it to uiAssistantMessage in post() when your persistence layer needs to store a separate client-facing view.

Buffered identities

Some identities are marked buffer: true in the mapper. Buffered deltas accumulate silently during the stream and are flushed as a single send after the stream ends. tool_calls is always buffered because tool call data arrives piecemeal across many chunks and is only meaningful once complete.

Silent identities

Deltas marked silent: true are assembled into canonical but never sent to the frontend. role and extensions are always silent.


Configuring streaming from a node

Your node’s exec() returns a StreamConfig. The framework takes it from there.

async exec(prepOutput) { const stream = await anthropic.messages.stream({ /* ... */ }); return { stream, provider: 'anthropic', }; }

StreamConfig

interface StreamConfig { stream: AsyncIterable<any>; provider: string; mapper?: ChunkMapper | (() => ChunkMapper); filter?: StreamFilter; }

stream — The raw async iterable returned by the provider SDK. Pass it directly; do not iterate it yourself.

provider — Selects the built-in mapper. Supported values: 'anthropic', 'openai', 'gemini-genai', 'openai-responses'.

mapper (optional) — A custom mapper that overrides the built-in one. Use this when you need to handle a provider variant or a non-standard API shape. See Writing a custom mapper.

filter (optional) — Controls what is sent to the frontend. See Filtering streamed output.


Built-in providers

'anthropic'

Handles Anthropic’s SSE streaming format (content_block_start, content_block_delta, content_block_stop, etc.).

Supports: content, thinking (extended thinking models), tool_calls.

Provider-specific data stored in canonical.extensions.anthropic: { signature } — the thinking block signature, required when replaying extended thinking history.

'openai'

Handles OpenAI Chat Completions streaming (choices[0].delta).

Supports: content, thinking (from reasoning_content on o3/o4-mini), tool_calls.

No extensions — OpenAI Chat Completions is the simplest shape and needs no extra data for replay.

'gemini-genai'

Handles Google GenAI’s generateContentStream format (candidates[0].content.parts).

Supports: content, thinking (thought parts), tool_calls (function call parts).

Provider-specific data stored in canonical.extensions.gemini: { parts, thoughtSignature } — the native parts array and thought signature, needed to reconstruct Gemini-native history for toProviderHistory().

'openai-responses'

Handles the OpenAI Responses API event format (response.output_item.added, response.output_text.delta, etc.).

Supports: content, thinking (from reasoning items), tool_calls (function call items).

Provider-specific data stored in canonical.extensions.openai_responses: { output } — the full output items array, needed for faithful Responses API history replay.


Filtering streamed output

The filter function runs on every delta before it is sent to the client. Use it to suppress, replace, or redact specific identities.

type StreamFilter = (identity: string, value: any) => any | false;
  • Return false → suppress this identity entirely. It will not appear in uiMessage.
  • Return value → pass through unchanged.
  • Return anything else → send the replacement instead. The replacement is what gets accumulated into uiMessage.

Examples

Suppress thinking from the frontend:

filter: (identity, value) => identity === 'thinking' ? false : value

Redact tool call arguments:

filter: (identity, value) => { if (identity === 'tool_calls') { return value.map((tc: any) => ({ ...tc, args: { _redacted: true } })); } return value; }

Replace tool calls with a lightweight UI indicator:

filter: (identity, value) => { if (identity === 'tool_calls') { return { type: 'tool_indicator', names: value.map((tc: any) => tc.name) }; } return value; }

When a filter is applied, handleProviderStream returns both canonical and uiMessage. When no filter is set, only canonical is returned and the persistence middleware treats it as the UI source of truth as well.


Using the result in post()

handleProviderStream is called by the framework between exec() and post(). The result is passed into your post() as assistantMessage and optionally uiAssistantMessage.

async post({ assistantMessage, uiAssistantMessage, history, cascadeId, userId }) { // assistantMessage is always the full canonical message // uiAssistantMessage is only present when you used a filter return { updates: { [cascadeId]: { history: [...history, assistantMessage], status: 'completed', } } }; }

StreamResult

interface StreamResult { canonical: Record<string, any>; uiMessage?: Record<string, any>; }

canonical — The fully assembled message. All identities, no filtering. Always present. Use this as the LLM history entry.

uiMessage — Only present when a filter was applied. Contains only what was sent to the client, with replacements applied.


Writing a custom mapper

A mapper is a function that takes a raw provider chunk and returns zero or more deltas. If no relevant data is in the chunk, return null.

type ChunkMapper = (chunk: any) => ChunkDelta | ChunkDelta[] | null;

Because mappers often need to track state across chunks (e.g. which tool call index is active), they are written as factory functions that return a stateful mapper:

const myMapper = (): ChunkMapper => { let roleEmitted = false; return (chunk): ChunkDelta | ChunkDelta[] | null => { const deltas: ChunkDelta[] = []; if (!roleEmitted) { roleEmitted = true; deltas.push({ identity: 'role', value: 'assistant', silent: true }); } if (chunk.output?.text) { deltas.push({ identity: 'content', value: chunk.output.text }); } return deltas.length === 0 ? null : deltas; }; };

Pass it in StreamConfig:

return { stream, provider: 'my-custom-provider', // still required, but mapper overrides it mapper: myMapper, };

ChunkDelta

interface ChunkDelta { identity: string; value: any; accumulate?: (current: any, incoming: any) => any; buffer?: boolean; silent?: boolean; }

identity — The field name this delta belongs to (e.g. 'content', 'tool_calls').

value — The data for this chunk. For streaming text, this is the incremental string.

accumulate (optional) — Custom accumulation logic. If omitted, string values are concatenated and non-string values replace. Use this for complex assembly such as building a tool_calls array across many chunks.

buffer (optional) — If true, the delta is held and not sent until the stream ends. Use this for data that is only meaningful in its complete form (e.g. tool_calls, extensions).

silent (optional) — If true, the delta is assembled into canonical but never sent to the frontend. Use this for provider-specific metadata.


Client-side: how chunks are reassembled

On the client, the stream arrives as a sequence of parsed JSON lines. The Redux listener dispatches each line to one of a few handlers:

  • type: 'init' — initializes the assistant message placeholder in state with status: 'streaming'.
  • type: 'sync' — replaces context with the final history and sets status: 'completed'. This arrives after the stream ends.
  • Everything else — dispatched as streamChunkReceived.

streamChunkReceived

streamChunkReceived performs blind assembly directly on the last assistant message in state. It has no knowledge of providers or identities — it simply applies incoming values to whatever field name it receives:

// If the field is already a string, append. if (typeof lastMessage[identity] === 'string') { lastMessage[identity] += value; } else { // Otherwise replace. lastMessage[identity] = value; }

This means the client-side shape of the assistant message during streaming matches the identity names the mapper emits: content, thinking, tool_calls, etc. Your UI reads directly from those fields.

Because the client assembles blindly by identity name, switching providers does not require any client-side changes — only the server-side mapper changes.


Full example: a streaming LLM node

import Anthropic from '@anthropic-ai/sdk'; const anthropic = new Anthropic(); const myLLMNode: ServerNodeDefinition = { name: 'myLLMNode', isUINode: false, env: 'server', isStreaming: true, async prep(context, initialContext) { const cascadeHistory = context[initialContext.cascadeId] ?? []; const latest = cascadeHistory[cascadeHistory.length - 1]; return { history: latest?.history ?? [], cascadeId: initialContext.cascadeId, }; }, async exec(prepOutput) { const stream = anthropic.messages.stream({ model: 'claude-opus-4-5', max_tokens: 8096, messages: prepOutput.history, }); return { stream, provider: 'anthropic', // Suppress extended thinking from the client filter: (identity, value) => identity === 'thinking' ? false : value, }; }, async post({ assistantMessage, uiAssistantMessage, history, cascadeId }) { return { updates: { [cascadeId]: { history: [...history, assistantMessage], status: 'completed', }, }, }; }, };
Last updated on