Class WorkflowBuilder

Fluent builder for deterministic DAG workflows.

Steps are appended in declaration order and connected sequentially. Branch and parallel primitives fan out and automatically rejoin at the next declared step.

Call .compile() to validate the graph (must be acyclic) and obtain a CompiledWorkflow ready for invoke(), stream(), or resume().

Constructors

Methods

  • Declare the input schema for this workflow.

    Accepts a Zod schema or any plain object; the value is forwarded to GraphCompiler which lowers it to JSON Schema via lowerZodToJsonSchema.

    Parameters

    • schema: any

      Input schema (Zod instance or plain JSON Schema object).

    Returns this

  • Attach a transport backend to this workflow.

    When type is 'voice', the compiled workflow will route graph I/O through the voice transport adapter at runtime. The config values override per-field defaults from agent.config.json.

    The transport config is stored as _transportConfig on the builder instance and is available for inspection or forwarding to the runtime.

    Parameters

    • type: "voice"

      Transport kind; currently only 'voice' is supported.

    • Optional config: Omit<VoiceTransportConfig, "type">

      Optional voice pipeline overrides (STT, TTS, voice, etc.).

    Returns this

    this for fluent chaining.

    Example

    const wf = workflow('voice-flow')
    .input(inputSchema)
    .returns(outputSchema)
    .transport('voice', { stt: 'deepgram', tts: 'openai', voice: 'alloy' })
    .step('listen', { tool: 'listen_tool' })
    .compile();
  • Append a single named step to the workflow.

    The step is connected from all current tail nodes and becomes the new single-element tail after it is added.

    Parameters

    • id: string

      Unique step identifier within this workflow.

    • config: StepConfig

      Execution and policy configuration for the step.

    Returns this

  • Append a conditional branch to the workflow.

    The condition function is evaluated at runtime against GraphState and must return one of the keys of routes. Each route becomes its own branch node; all branches become the collective tail that the next declared step connects from.

    Parameters

    • condition: ((state) => string)

      Routing function; return value must match a key in routes.

        • (state): string
        • Parameters

          • state: any

          Returns string

    • routes: Record<string, StepConfig>

      Map of route key → step config for each branch arm.

    Returns this

  • Append a parallel fan-out to the workflow.

    All steps execute concurrently (subject to runtime scheduling). After all branches complete, their outputs are merged using the join.merge reducers. The parallel branch nodes collectively become the new tail.

    Parameters

    • steps: StepConfig[]

      Array of step configs to execute concurrently.

    • join: {
          strategy: "all" | "any" | "quorum";
          quorumCount?: number;
          merge: Record<string, ReducerFn | BuiltinReducer>;
          timeout?: number;
      }

      Fan-in configuration including merge strategy and reducers.

      • strategy: "all" | "any" | "quorum"
      • Optional quorumCount?: number
      • merge: Record<string, ReducerFn | BuiltinReducer>
      • Optional timeout?: number

    Returns this

  • Compile the workflow into an executable CompiledWorkflow.

    Compilation steps:

    1. Validate that .input() and .returns() schemas were declared.
    2. Lower each InternalStep into GraphNode + GraphEdge IR objects, threading tailNodeIds to connect steps sequentially.
    3. Connect all final tail nodes to END.
    4. Run GraphCompiler.compile() to produce a CompiledExecutionGraph.
    5. Run GraphValidator.validate() with { requireAcyclic: true } — throws on cycle.
    6. Wrap in a CompiledWorkflow with a GraphRuntime backed by the given store.

    Parameters

    • Optional options: {
          checkpointStore?: ICheckpointStore;
      }

      Optional compilation options.

      • Optional checkpointStore?: ICheckpointStore

        Custom checkpoint backend; defaults to InMemoryCheckpointStore.

    Returns CompiledWorkflow

    Throws

    When .input() or .returns() was not called.

    Throws

    When the compiled graph contains a cycle (should never happen via this API).