Skip to content

Commit 0176f95

Browse files
fix(#2457): retry MCP toolsets after tool calls within the same turn
When an MCP server is configured but unavailable at session start, the agent now retries automatically after every tool-call batch — within the same user turn — without requiring a new user message. ## Core changes ### MCP double-watcher race (pkg/tools/mcp/mcp.go) Added watcherAlive bool to Toolset. Toolset.Start() only spawns go watchConnection(...) when !watcherAlive. The goroutine clears the flag on all exit paths via defer. This prevents reprobe() from spawning a second watcher while an existing one is mid-backoff (ts.started==false but goroutine alive), which would cause racing doStart() calls and unsafe close/recreate of ts.restarted. ### Failure deduplication + recovery notices (pkg/tools/startable.go) ShouldReportFailure() returns true exactly once per failure streak, suppressing repeated 'start failed' warnings on every retry. ConsumeRecovery() returns true exactly once when a previously-failed toolset successfully starts, triggering a 'now available' warning. Both surface via WarningEvent -> notification.WarningCmd() (persistent TUI notifications that stay until dismissed). ### Reprobe after each tool-call batch (pkg/runtime/loop.go) reprobe() is called after every tool-call batch. It re-runs ensureToolSetsAreStarted() without emitting MCPInitStarted/Finished events (no TUI spinner flicker), emits any pending warnings, and emits a ToolsetInfo event when new tools appear. The updated tool list is picked up by the top-of-loop getTools() on the next iteration, so the model sees new tools in its very next response within the same user turn. ### TUI (pkg/agent/agent.go, pkg/runtime/loop.go, pkg/runtime/event.go) DrainWarnings() now includes both failure and recovery messages. WarningEvent used for all toolset lifecycle notifications. ## Tests - pkg/tools/startable_test.go: ShouldReportFailure/ConsumeRecovery behaviour (one warning per streak, recovery fires once, Stop resets) - pkg/agent/agent_test.go: TestAgentReProbeEmitsWarningThenNotice, TestAgentNoDuplicateStartWarnings - pkg/runtime/runtime_test.go: TestReprobe_NewToolsAvailableAfterToolCall, TestReprobe_NoChangeMeansNoExtraEvents Fixes #2457 Assisted-By: docker-agent
1 parent 15945f4 commit 0176f95

7 files changed

Lines changed: 596 additions & 31 deletions

File tree

pkg/agent/agent.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,24 @@ func (a *Agent) ToolSets() []tools.ToolSet {
260260
func (a *Agent) ensureToolSetsAreStarted(ctx context.Context) {
261261
for _, toolSet := range a.toolsets {
262262
if err := toolSet.Start(ctx); err != nil {
263-
desc := tools.DescribeToolSet(toolSet)
264-
slog.Warn("Toolset start failed; skipping", "agent", a.Name(), "toolset", desc, "error", err)
265-
a.addToolWarning(fmt.Sprintf("%s start failed: %v", desc, err))
263+
// Only warn on the first failure in a streak; suppress duplicate
264+
// warnings for subsequent retries that also fail.
265+
if toolSet.ShouldReportFailure() {
266+
desc := tools.DescribeToolSet(toolSet)
267+
slog.Warn("Toolset start failed; will retry on next turn", "agent", a.Name(), "toolset", desc, "error", err)
268+
a.addToolWarning(fmt.Sprintf("%s start failed: %v", desc, err))
269+
} else {
270+
desc := tools.DescribeToolSet(toolSet)
271+
slog.Debug("Toolset still unavailable; retrying next turn", "agent", a.Name(), "toolset", desc, "error", err)
272+
}
266273
continue
267274
}
275+
// Emit a one-time notice when a previously-failed toolset recovers.
276+
if toolSet.ConsumeRecovery() {
277+
desc := tools.DescribeToolSet(toolSet)
278+
slog.Info("Toolset now available", "agent", a.Name(), "toolset", desc)
279+
a.addToolWarning(desc + " is now available")
280+
}
268281
}
269282
}
270283

pkg/agent/agent_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,34 @@ func (s *stubToolSet) Tools(context.Context) ([]tools.Tool, error) {
4444
return s.tools, nil
4545
}
4646

47+
// flappyToolSet is a ToolSet+Startable that returns a scripted sequence of
48+
// errors from Start(). nil in the sequence means success.
49+
type flappyToolSet struct {
50+
errs []error
51+
callIdx int
52+
stubs []tools.Tool
53+
}
54+
55+
var (
56+
_ tools.ToolSet = (*flappyToolSet)(nil)
57+
_ tools.Startable = (*flappyToolSet)(nil)
58+
)
59+
60+
func (f *flappyToolSet) Start(_ context.Context) error {
61+
if f.callIdx >= len(f.errs) {
62+
return nil
63+
}
64+
err := f.errs[f.callIdx]
65+
f.callIdx++
66+
return err
67+
}
68+
69+
func (f *flappyToolSet) Stop(_ context.Context) error { return nil }
70+
71+
func (f *flappyToolSet) Tools(_ context.Context) ([]tools.Tool, error) {
72+
return f.stubs, nil
73+
}
74+
4775
func TestAgentTools(t *testing.T) {
4876
tests := []struct {
4977
name string
@@ -210,3 +238,61 @@ func TestModelOverride_ConcurrentAccess(t *testing.T) {
210238
<-done
211239
// If we got here without a race condition panic, the test passes
212240
}
241+
242+
// TestAgentReProbeEmitsWarningThenNotice verifies the full retry lifecycle:
243+
// turn 1 fails → warning emitted; turn 2 succeeds → notice emitted; tools available.
244+
func TestAgentReProbeEmitsWarningThenNotice(t *testing.T) {
245+
t.Parallel()
246+
247+
errBoom := errors.New("server unavailable")
248+
stub := &flappyToolSet{
249+
errs: []error{errBoom, nil},
250+
stubs: []tools.Tool{{Name: "mcp_ping", Parameters: map[string]any{}}},
251+
}
252+
a := New("root", "test", WithToolSets(stub))
253+
254+
// Turn 1: start fails → 1 warning, 0 tools.
255+
got, err := a.Tools(t.Context())
256+
require.NoError(t, err)
257+
assert.Empty(t, got, "turn 1: no tools while toolset is unavailable")
258+
warnings := a.DrainWarnings()
259+
require.Len(t, warnings, 1, "turn 1: exactly one warning expected")
260+
assert.Contains(t, warnings[0], "start failed")
261+
262+
// Turn 2: start succeeds → 1 recovery warning, tools available.
263+
got, err = a.Tools(t.Context())
264+
require.NoError(t, err)
265+
assert.Len(t, got, 1, "turn 2: tool should be available after recovery")
266+
recovery := a.DrainWarnings()
267+
require.Len(t, recovery, 1, "turn 2: exactly one recovery warning expected")
268+
assert.Contains(t, recovery[0], "now available", "turn 2: recovery warning must mention availability")
269+
}
270+
271+
// TestAgentNoDuplicateStartWarnings verifies that repeated failures generate
272+
// only one warning (on the first failure), not one per retry.
273+
func TestAgentNoDuplicateStartWarnings(t *testing.T) {
274+
t.Parallel()
275+
276+
errBoom := errors.New("server unavailable")
277+
stub := &flappyToolSet{
278+
errs: []error{errBoom, errBoom, errBoom},
279+
stubs: []tools.Tool{{Name: "mcp_ping", Parameters: map[string]any{}}},
280+
}
281+
a := New("root", "test", WithToolSets(stub))
282+
283+
// Turn 1: first failure → warning.
284+
_, err := a.Tools(t.Context())
285+
require.NoError(t, err)
286+
warnings := a.DrainWarnings()
287+
require.Len(t, warnings, 1, "turn 1: exactly one warning on first failure")
288+
289+
// Turn 2: repeated failure → no new warning.
290+
_, err = a.Tools(t.Context())
291+
require.NoError(t, err)
292+
assert.Empty(t, a.DrainWarnings(), "turn 2: no duplicate warning on repeated failure")
293+
294+
// Turn 3: still failing → still no new warning.
295+
_, err = a.Tools(t.Context())
296+
require.NoError(t, err)
297+
assert.Empty(t, a.DrainWarnings(), "turn 3: no duplicate warning on repeated failure")
298+
}

pkg/runtime/loop.go

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
100100
r.emitAgentWarnings(a, chanSend(events))
101101
r.configureToolsetHandlers(a, events)
102102

103-
agentTools, err := r.getTools(ctx, a, sessionSpan, events)
103+
agentTools, err := r.getTools(ctx, a, sessionSpan, events, true)
104104
if err != nil {
105105
events <- Error(fmt.Sprintf("failed to get tools: %v", err))
106106
return
@@ -163,7 +163,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
163163
r.emitAgentWarnings(a, chanSend(events))
164164
r.configureToolsetHandlers(a, events)
165165

166-
agentTools, err := r.getTools(ctx, a, sessionSpan, events)
166+
agentTools, err := r.getTools(ctx, a, sessionSpan, events, true)
167167
if err != nil {
168168
events <- Error(fmt.Sprintf("failed to get tools: %v", err))
169169
return
@@ -382,6 +382,20 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
382382

383383
r.processToolCalls(ctx, sess, res.Calls, agentTools, events)
384384

385+
// Re-probe toolsets after tool calls: an install/setup tool call may
386+
// have made a previously-unavailable LSP or MCP connectable. reprobe()
387+
// calls ensureToolSetsAreStarted, emits recovery notices, and updates
388+
// the TUI tool-count immediately.
389+
//
390+
// The new tools are picked up by the next iteration's getTools() call
391+
// at the top of this loop, so the model sees them on its very next
392+
// response — within the same user turn, without requiring a new user
393+
// message. reprobe's return value is intentionally discarded here;
394+
// the top-of-loop getTools() is the authoritative source.
395+
if len(res.Calls) > 0 {
396+
r.reprobe(ctx, sess, a, agentTools, sessionSpan, events)
397+
}
398+
385399
// Check for degenerate tool call loops
386400
if loopDetector.record(res.Calls) {
387401
toolName := "unknown"
@@ -575,17 +589,14 @@ func (r *LocalRuntime) compactIfNeeded(
575589
r.Summarize(ctx, sess, "", events)
576590
}
577591

578-
// getTools executes tool retrieval with automatic OAuth handling
579-
func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events chan Event) ([]tools.Tool, error) {
580-
shouldEmitMCPInit := len(a.ToolSets()) > 0
581-
if shouldEmitMCPInit {
592+
// getTools executes tool retrieval with automatic OAuth handling.
593+
// emitLifecycleEvents controls whether MCPInitStarted/Finished are emitted;
594+
// pass false when calling from reprobe to avoid spurious TUI spinner flicker.
595+
func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events chan Event, emitLifecycleEvents bool) ([]tools.Tool, error) {
596+
if emitLifecycleEvents && len(a.ToolSets()) > 0 {
582597
events <- MCPInitStarted(a.Name())
598+
defer func() { events <- MCPInitFinished(a.Name()) }()
583599
}
584-
defer func() {
585-
if shouldEmitMCPInit {
586-
events <- MCPInitFinished(a.Name())
587-
}
588-
}()
589600

590601
agentTools, err := a.Tools(ctx)
591602
if err != nil {
@@ -616,15 +627,15 @@ func (r *LocalRuntime) configureToolsetHandlers(a *agent.Agent, events chan Even
616627
}
617628
}
618629

619-
// emitAgentWarnings drains and emits any agent initialization warnings.
630+
// emitAgentWarnings drains and emits any pending toolset warnings as persistent
631+
// TUI notifications. Both start failures and recovery notices are emitted as
632+
// warnings so they remain visible until the user dismisses them.
620633
func (r *LocalRuntime) emitAgentWarnings(a *agent.Agent, send func(Event)) {
621634
warnings := a.DrainWarnings()
622-
if len(warnings) == 0 {
623-
return
635+
if len(warnings) > 0 {
636+
slog.Warn("Tool setup partially failed; continuing", "agent", a.Name(), "warnings", warnings)
637+
send(Warning(formatToolWarning(a, warnings), a.Name()))
624638
}
625-
626-
slog.Warn("Tool setup partially failed; continuing", "agent", a.Name(), "warnings", warnings)
627-
send(Warning(formatToolWarning(a, warnings), a.Name()))
628639
}
629640

630641
func formatToolWarning(a *agent.Agent, warnings []string) string {
@@ -669,3 +680,52 @@ func chanSend(ch chan Event) func(Event) {
669680
}
670681
}
671682
}
683+
684+
// reprobe re-runs ensureToolSetsAreStarted after a batch of tool calls.
685+
// If new tools became available (by name-set diff), it emits recovery notices
686+
// and a ToolsetInfo event to update the TUI immediately. The new tools will be
687+
// picked up by the next iteration's getTools() call at the top of the loop.
688+
//
689+
// reprobe deliberately does NOT return the new tool list: the top-of-loop
690+
// getTools() is the single authoritative source for agentTools each iteration.
691+
func (r *LocalRuntime) reprobe(
692+
ctx context.Context,
693+
sess *session.Session,
694+
a *agent.Agent,
695+
currentTools []tools.Tool,
696+
sessionSpan trace.Span,
697+
events chan Event,
698+
) {
699+
updated, err := r.getTools(ctx, a, sessionSpan, events, false)
700+
if err != nil {
701+
slog.Warn("reprobe: getTools failed", "agent", a.Name(), "error", err)
702+
return
703+
}
704+
updated = filterExcludedTools(updated, sess.ExcludedTools)
705+
706+
// Emit any pending warnings/notices that getTools just generated.
707+
r.emitAgentWarnings(a, chanSend(events))
708+
709+
// Compute added tools by comparing name-sets (not just counts), so we
710+
// correctly handle a toolset that replaced one tool with another.
711+
prev := make(map[string]struct{}, len(currentTools))
712+
for _, t := range currentTools {
713+
prev[t.Name] = struct{}{}
714+
}
715+
var added []string
716+
for _, t := range updated {
717+
if _, exists := prev[t.Name]; !exists {
718+
added = append(added, t.Name)
719+
}
720+
}
721+
722+
if len(added) == 0 {
723+
return
724+
}
725+
726+
slog.Info("New tools available after toolset re-probe",
727+
"agent", a.Name(), "added", added)
728+
729+
// Emit updated tool count to the TUI immediately.
730+
chanSend(events)(ToolsetInfo(len(updated), false, a.Name()))
731+
}

0 commit comments

Comments
 (0)