Skip to content

Add mid-turn message steering for running agent sessions#2363

Open
trungutt wants to merge 11 commits intodocker:mainfrom
trungutt:steer-mid-turn-messages
Open

Add mid-turn message steering for running agent sessions#2363
trungutt wants to merge 11 commits intodocker:mainfrom
trungutt:steer-mid-turn-messages

Conversation

@trungutt
Copy link
Copy Markdown
Contributor

@trungutt trungutt commented Apr 9, 2026

Summary

Addresses #2223.

  • Add POST /sessions/:id/steer API endpoint that injects user messages into a running agent session mid-turn
  • Add POST /sessions/:id/followup API endpoint that queues messages for end-of-turn processing (one per turn)
  • Agent loop drains steered messages after tool execution and before the stop-condition check, so the LLM sees them on the next iteration
  • Follow-up messages are popped one at a time after the model stops and stop-hooks run, giving each a full undivided agent turn
  • Emits user_message SSE events when messages are picked up, so API clients can confirm delivery to the user
  • Guard RunSession against concurrent streams on the same session

This is a common pattern in agentic coding tools where the user can steer or provide follow-up context while the agent is executing tool calls. No behavioral change to the TUI — the existing client-side message queue continues to work as before. The TUI can adopt mid-turn steering in a follow-up PR.

Current flow (before this PR)

Messages sent while the agent is busy are held client-side and only processed after the entire turn finishes. Each queued message starts a completely new turn.

sequenceDiagram
    participant User
    participant Client
    participant Agent Loop

    User->>Client: "fix the bug"
    Client->>Agent Loop: POST /sessions/:id/agent (start stream)
    activate Agent Loop

    Note over Agent Loop: LLM call → tool calls → LLM call → ...

    User->>Client: "also update the tests"
    Note over Client: Message held in<br/>client-side queue

    User->>Client: "use pytest not unittest"
    Note over Client: Message held in<br/>client-side queue

    Agent Loop-->>Client: StreamStopped
    deactivate Agent Loop

    Client->>Agent Loop: POST /sessions/:id/agent ("also update the tests")
    activate Agent Loop
    Agent Loop-->>Client: StreamStopped
    deactivate Agent Loop

    Client->>Agent Loop: POST /sessions/:id/agent ("use pytest not unittest")
    activate Agent Loop
    Agent Loop-->>Client: StreamStopped
    deactivate Agent Loop
Loading

The problem

  • The agent finishes its work without knowing the user wanted tests updated or pytest used — it may have already written unittest-based tests
  • Each queued message starts a separate turn, so the agent does redundant work (e.g. rewrites tests it just wrote)
  • The user has no way to course-correct the agent while it's working

New flow (this PR)

Two distinct mechanisms for two distinct intents:

Steer — urgent mid-turn injection

sequenceDiagram
    participant User
    participant Client
    participant Steer Queue
    participant Agent Loop

    User->>Client: "fix the bug"
    Client->>Agent Loop: POST /sessions/:id/agent (start stream)
    activate Agent Loop

    Note over Agent Loop: LLM call → tool calls executing...

    User->>Client: "use pytest not unittest"
    Client->>Steer Queue: POST /sessions/:id/steer
    Note over Steer Queue: Message enqueued

    Note over Agent Loop: Tool calls finish

    Steer Queue-->>Agent Loop: Drain all (mid-turn)
    Note over Agent Loop: Injects message in<br/>system-reminder tags,<br/>forces loop to continue

    Note over Agent Loop: LLM call (sees steered message) → ...

    Agent Loop-->>Client: StreamStopped
    deactivate Agent Loop
Loading

The agent sees "use pytest not unittest" before writing any tests — no wasted work.

Follow-up — end-of-turn queue

sequenceDiagram
    participant User
    participant Client
    participant FollowUp Queue
    participant Agent Loop

    User->>Client: "fix the bug"
    Client->>Agent Loop: POST /sessions/:id/agent (start stream)
    activate Agent Loop

    User->>Client: "then write a README"
    Client->>FollowUp Queue: POST /sessions/:id/followup
    Note over FollowUp Queue: Message enqueued

    User->>Client: "and add a changelog entry"
    Client->>FollowUp Queue: POST /sessions/:id/followup
    Note over FollowUp Queue: Message enqueued

    Note over Agent Loop: Model stops (bug fixed)

    FollowUp Queue-->>Agent Loop: Pop one ("write a README")
    Note over Agent Loop: New turn starts<br/>(plain user message)
    Note over Agent Loop: Model stops (README written)

    FollowUp Queue-->>Agent Loop: Pop one ("add a changelog entry")
    Note over Agent Loop: New turn starts
    Note over Agent Loop: Model stops (changelog added)

    Agent Loop-->>Client: StreamStopped
    deactivate Agent Loop
Loading

Each follow-up gets a full undivided turn — the agent finishes one task before starting the next.

Steer vs Follow-up — two distinct scenarios

Steer (POST /steer) Follow-up (POST /followup)
When Injected at the next safe point mid-turn (after tool execution, before the next LLM call) Processed after the current turn finishes completely
Drain semantics Drain all pending messages every check Pop exactly one message per turn boundary
Injection format Wrapped in <system-reminder> tags (mid-stream context) Plain user message (fresh turn)
Effect on loop Forces res.Stopped = false — loop continues continue — starts a new iteration of the loop
Use case "The agent needs to see this NOW" — course corrections, additional context "Run this next" — follow-up questions that can wait

MessageQueue interface with Lock + Confirm/Cancel

The queue storage is behind a MessageQueue interface so callers can provide their own implementation (e.g. persistent/distributed store). The interface uses a Lock + Confirm/Cancel pattern for safe dequeue: Dequeue locks a message, Confirm permanently removes it after sess.AddMessage succeeds, and Cancel releases it back to the queue on failure. This prevents message loss in persistent implementations when the process crashes between dequeue and session persistence.

type MessageQueue interface {
    Enqueue(ctx context.Context, msg QueuedMessage) bool
    Dequeue(ctx context.Context) (QueuedMessage, bool)  // locks the message
    Confirm(ctx context.Context) error                   // removes after successful processing
    Cancel(ctx context.Context) error                    // releases back to queue on failure
    Drain(ctx context.Context) []QueuedMessage           // lock + auto-confirm all
    Len(ctx context.Context) int
}

The default in-memory implementation (backed by a buffered channel) treats Confirm/Cancel as no-ops. Custom implementations can be injected via WithSteerQueue and WithFollowUpQueue options:

rt, err := runtime.New(team,
    runtime.WithSteerQueue(myPersistentQueue),
    runtime.WithFollowUpQueue(myPersistentQueue),
)

Addresses docker#2223.

Allow API clients to inject user messages into an active agent session
without waiting for the current turn to finish. This is a common pattern
in agentic coding tools where the user can steer or provide follow-up
context while the agent is executing tool calls.

New API endpoint: POST /sessions/:id/steer

Runtime changes:
- SteeredMessage type + buffered channel on LocalRuntime
- Steer() enqueues, DrainSteeredMessages() batch-drains
- Agent loop injects steered messages after tool execution and before
  the stop-condition check; emits user_message events so clients know
  when the LLM actually picks them up
- Messages wrapped in <system-reminder> tags for clear LLM attribution

Server changes:
- POST /sessions/:id/steer endpoint (202 Accepted)
- SteerSession() on SessionManager with GetLocalRuntime() helper for
  PersistentRuntime unwrapping
- Concurrent stream guard on RunSession (rejects if already streaming)
- Proper defer ordering: streaming flag cleared before channel close

No behavioral change to the TUI — the existing client-side message
queue continues to work as before. The TUI can adopt mid-turn steering
in a future change by calling LocalRuntime.Steer() directly.
@trungutt trungutt force-pushed the steer-mid-turn-messages branch from 2bb7953 to e6f7898 Compare April 9, 2026 15:35
Introduce a SteerQueue interface (Enqueue/Drain) so that callers can
provide their own storage implementation for steered messages. The
default InMemorySteerQueue uses a buffered channel and is created
automatically. Custom implementations can be injected via the
WithSteerQueue option on LocalRuntime.
@trungutt trungutt force-pushed the steer-mid-turn-messages branch from e6f7898 to 717c63a Compare April 9, 2026 15:36
@trungutt trungutt marked this pull request as ready for review April 9, 2026 20:31
@trungutt trungutt requested a review from a team as a code owner April 9, 2026 20:31
Add SteerSession to the RemoteClient interface and implement it on the
HTTP Client (POST /sessions/:id/steer). RemoteRuntime.Steer() delegates
to the remote server so the TUI works identically regardless of whether
the runtime is local or remote.

App.Steer() now tries GetLocalRuntime first, then falls back to the
Steerer interface so both PersistentRuntime and RemoteRuntime are
handled.
@trungutt trungutt requested review from a team and rumpl April 10, 2026 06:12
Implement the two-queue design proposed by rumpl: steering (urgent mid-
turn injection) and follow-up (end-of-turn, one-at-a-time processing)
are fundamentally different intents that need separate queues.

- Rename SteeredMessage → QueuedMessage (shared by both queues)
- Replace SteerQueue with MessageQueue interface: adds context.Context
  to all methods, adds Dequeue (pop one) and Len
- Add followUpQueue to LocalRuntime with WithFollowUpQueue option
- Split agent loop: steer drains ALL mid-turn, follow-up pops ONE
  after stop-hooks, then continues the loop for a new turn
- Follow-up messages are plain user messages (no system-reminder wrap)
- Add POST /sessions/:id/followup endpoint
- Add FollowUpSession to RemoteClient, Client, RemoteRuntime, and App
GetLocalRuntime is no longer needed: PersistentRuntime inherits Steer()
and FollowUp() from embedded *LocalRuntime, and RemoteRuntime implements
them directly. All call sites now use the MessageInjector interface for
dispatch, which is cleaner and doesn't require knowledge of concrete
runtime wrapper types.
Add Confirm and Cancel methods to the MessageQueue interface so that
persistent queue implementations can use a transactional dequeue pattern:
Dequeue locks a message (in-flight), Confirm permanently removes it
after sess.AddMessage succeeds, Cancel releases it back to the queue
on failure.

This prevents message loss when the process crashes or the context is
cancelled between dequeue and session persistence. The in-memory
implementation treats Confirm/Cancel as no-ops since the message is
already consumed from the channel on Dequeue.

The agent loop now calls Confirm after successfully adding a follow-up
message to the session. Drain (used for steer messages) auto-confirms
all messages in a batch.
…turn error

- Move QueuedMessage, MessageQueue interface, and inMemoryMessageQueue
  to dedicated message_queue.go file
- Add Steer() and FollowUp() to the Runtime interface — all runtimes
  implement them, no need for a separate MessageInjector interface
- Return error instead of bool from Steer/FollowUp for richer failure
  information (queue full, no active session, network errors)
- Simplify App and SessionManager: call runtime.Steer/FollowUp directly
  without type assertions
As rumpl pointed out, calling Confirm after sess.AddMessage does not
protect against anything: AddMessage is an in-memory operation, not a
store write. If the process dies between Dequeue and AddMessage, the
in-memory session is lost regardless.

The Confirm/Cancel methods remain on the MessageQueue interface for
implementations that integrate with their own persistence layer, but
the agent loop does not call them since it has no durable persistence
point between dequeue and the next LLM call.
@trungutt trungutt requested review from a team and rumpl April 10, 2026 13:46
Cleaner and consistent with the follow-up case: explicitly re-enter the
loop rather than relying on the reader understanding that setting
res.Stopped = false will make the subsequent check pass through.
…queueFollowUp

Slim down the MessageQueue interface to Enqueue, Dequeue, and Drain —
the three methods actually called. Remove wrapper methods on
LocalRuntime (DrainSteeredMessages, DequeueFollowUp) and call the
queues directly from loop.go.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants