Controller
The controller is the concurrency primitive that makes multi-agent coordination, recursion, and fan-out/fan-in trivial to implement in plain code. Every exec step gets one.
From within a node’s exec, the controller lets you treat the entire graph as a programmable surface. You can spawn agents and wait for them, read any cascade’s state, gate execution on arbitrary conditions, fork history into new branches, or write updates without yielding the current node.
controller makes it possible to express complex agentic patterns in a few lines of code, as opposed to these
patterns being framework features.
A few scenarios. API docs to follow.
- Sub agent delegation is trivial:
controller.spawnwill return a promise that resolves when the sub agent cascade is complete. Then you can read the sub agent state usingcontroller.getCascadeState(subAgentCascadeId) - Similarly recursion is trivial: Same as above but the spawn will be the same agent. Implement depth guards using conventions on
cascadeId(see conventions section). - Fan Out / Fan In: spawn N parallel research or data-fetching nodes,
waitUntilall are complete usingisCompleteon each cascadeId, then read all their states and synthesise in a single aggregator node. Classic MapReduce but expressed in three controller calls. - Cascade as Queue: a coordinator node that reads a task list from state, spawns one worker per task, waits for all to complete, then writes a summary. The cascade state is the job queue and the result store simultaneously.
- Cross-cascade Signalling: one agent writes a flag into a known cascadeId using
updateContext. Another agent in a completely separate cascadewaitUntilthat flag appears. Loosely coupled coordination without a message broker.
On the client side, we use useWorkflow and useCascade hooks to observe and control graph execution.
controller is the server side analogue for these with similar API surfaces, but is vastly more powerful.
In this section we will walk through the controller API surface.
Check out the Recursive ReAct Agent tutorial for a concrete use case.
Here is a quick inventory
| Method | Arguments | Returns | Description |
|---|---|---|---|
updateContext(updates) | updates: Updates | void | Write state updates without completing the current node |
spawn(spawns) | spawns: Record<string, SpawnContext> | Promise<void> | Spawn one or more nodes and wait for them all to complete |
fork(newCascadeId, upToFunctionId, sourceCascadeId) | newCascadeId: string, upToFunctionId: number, sourceCascadeId?: string | Promise<{ status: 'SUCCESS' | 'FAILED' }> | Branch an existing cascade from a given checkpoint into a new independent cascade |
waitUntil(predicate) | predicate: (state: ServerRootState) => boolean | Promise<void> | Suspend the current node until an arbitrary condition on the server state is met |
getCascadeState(cascadeId) | cascadeId: string | CascadeStateResult | undefined | Read the current state object for a cascade |
getCascadeNodes(cascadeId) | cascadeId: string | CascadeNode[] | Get the list of currently active nodes in a cascade |
isComplete(cascadeId) | cascadeId: string | boolean | Check if a cascade has finished (no active nodes + has recorded state) |
exists(cascadeId) | cascadeId: string | boolean | Check if a cascade is known to the store at all |
getState() | — | ServerRootState | Read the full store state |
This allows you to freely control and observe the graph from within a node.
API
updateContext(updates)
updateContext(updates: Updates): Promise<void>Dispatches a context update to the store without completing the current node. Please note that this Update
will not be automatically synced to the client, but will remain persistenly server side.
Types:
type WorkflowStep = {
history: any[];
status: string;
[extraKeys: string]: any; // this should give some freedom for custom keys
};
type Updates = {
[cascadeId: string] : WorkflowStep
}
Some use cases:
- write an update into an existing but currently inactive cascade, to seed it when you restart it/it resumes. An alternate way for a sub agent to pass data back up to the parent before completion.
- write an update to your own cascade state, while waiting for some condition that other cascades can then read.
Scratpad Pattern
One interesting pattern is to spawn a dummy node to create a cascade space and then use it as a collaborative scratchpad/coordination layer for an agent swarm without poisoning individual contexts.
spawn(spawns)
spawn(spawns: Spawns): Promise<void>Spawns one or more nodes, optionally as part of named cascades, and waits for all of them to complete before resolving.
Each key in spawns is a node name; the value is a SpawnContext containing any data the node needs, including optional cascadeId and userId fields.
If no cascadeId values are provided, spawn resolves immediately after dispatching. This would be an ephemeral node without persistence. Mostly used for
one off nodes that run side effects.
Types:
type SpawnContext<T = Record<string, any>> = {
cascadeId?: string;
history: any;
userId?: string;
} & T;
type Spawns = Record<string, SpawnContext>;
Example: spawning a single ephemeral node:
await controller.spawn({
sendEmail: { userId, to: 'user@example.com', history:[] } // history is the one mandatory part
});Example: spawning multiple nodes and awaiting all:
await controller.spawn({
fetchUserProfile: { userId, cascadeId, history },
fetchPermissions: { userId, cascadeId, history },
});
// both nodes have finished by herefork(newCascadeId, upToFunctionId, sourceCascadeId)
fork(
newCascadeId: string,
upToFunctionId: number,
sourceCascadeId?: string
): Promise<{ status: 'SUCCESS' | 'FAILED' }>Creates a new cascade by forking an existing one up to a given function boundary. The new cascade is hydrated with the source cascade’s state up to upToFunctionId, making it an independent branch from that point forward.
Returns { status: 'SUCCESS' } on success, or { status: 'FAILED' } if the fork operation throws.
waitUntil(predicate)
waitUntil(predicate: (state: RootState) => boolean): Promise<void>Suspends execution until the given predicate returns true against the current store state. The predicate is re-evaluated on every state change.
This allows for you to pause nodes until some condition is met, enabling all sorts of workflow patterns.
Example — waiting for an external cascade to finish:
await controller.waitUntil(state =>
controller.isComplete(someCascadeId)
);getCascadeState(cascadeId)
getCascadeState(cascadeId: string): CascadeState | undefinedReturns the current state object for a cascade, or undefined if the cascade does not exist.
getCascadeNodes(cascadeId)
getCascadeNodes(cascadeId: string): ActiveNode[]Returns the list of currently active nodes for a cascade.
isComplete(cascadeId)
isComplete(cascadeId: string): booleanReturns true if the cascade has a recorded state and no active nodes remaining — i.e. it has finished executing.
exists(cascadeId)
exists(cascadeId: string): booleanReturns true if the cascade is known to the store, either because it has active nodes or because it has a recorded state. Returns false if the cascade has never been seen.
getState()
getState(): RootStateReturns the full current store state. Use this for one-off reads when you need access to slices outside the cascade-specific selectors.