Skip to content

Commit c8022bd

Browse files
ysyneuclaude
andcommitted
fix: improve log format and content for better debuggability
Restore RFC3339 timestamps, use operation-specific log messages instead of generic "executing task", standardize key names (server/tool), add missing context to error logs (message type, raw_size, worknode_id), and track MCP call goroutines for graceful shutdown. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f6f7d23 commit c8022bd

5 files changed

Lines changed: 72 additions & 56 deletions

File tree

cmd/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,11 @@ func setupLogging(levelStr string) {
244244
level := parseLogLevel(levelStr)
245245
opts := &slog.HandlerOptions{
246246
Level: level,
247-
// Strip slog's time field — the process manager (systemd/journald)
248-
// already prepends its own timestamp to every line.
249247
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
250248
if a.Key == slog.TimeKey {
251-
return slog.Attr{}
249+
if t, ok := a.Value.Any().(time.Time); ok {
250+
a.Value = slog.StringValue(t.Format(time.RFC3339))
251+
}
252252
}
253253
return a
254254
},

mcp/client.go

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,23 @@ func NewClientManager() *ClientManager {
4444
// GetSession returns or creates an MCP session for the given server.
4545
func (m *ClientManager) GetSession(ctx context.Context, server *protocol.MCPServerConfig, logger *slog.Logger) (*sdk_mcp.ClientSession, error) {
4646
if logger == nil {
47-
logger = slog.Default()
47+
logger = slog.With("server", server.Name)
4848
}
4949

5050
m.mu.Lock()
5151
defer m.mu.Unlock()
5252

5353
serverName := server.Name
5454

55-
logger.Debug("mcp get session",
56-
"server_name", serverName,
55+
logger.Debug("mcp resolving session",
5756
"transport", server.Transport,
5857
"url", server.URL,
5958
"command", server.Command,
6059
)
6160

6261
// Check if session exists and is still valid
6362
if session, ok := m.sessions[serverName]; ok {
64-
logger.Debug("mcp reusing existing session", "server_name", serverName)
63+
logger.Debug("mcp reusing session")
6564
return session, nil
6665
}
6766

@@ -77,51 +76,45 @@ func (m *ClientManager) GetSession(ctx context.Context, server *protocol.MCPServ
7776

7877
// Create transport
7978
logger.Info("mcp creating transport",
80-
"server_name", serverName,
8179
"transport", server.Transport,
8280
"url", server.URL,
8381
"command", server.Command,
8482
)
8583

8684
transport, err := createTransport(server)
8785
if err != nil {
88-
logger.Error("mcp create transport failed", "server_name", serverName, "error", err)
86+
logger.Error("mcp transport creation failed", "error", err)
8987
return nil, err
9088
}
9189

9290
// Connect with timeout
93-
logger.Info("mcp connecting to server", "server_name", serverName, "timeout", DefaultConnectTimeout)
91+
logger.Info("mcp connecting", "timeout", DefaultConnectTimeout)
9492

9593
connectCtx, cancel := context.WithTimeout(ctx, DefaultConnectTimeout)
9694
defer cancel()
9795

9896
session, err := client.Connect(connectCtx, transport, nil)
9997
if err != nil {
100-
logger.Error("mcp connect failed",
101-
"server_name", serverName,
98+
logger.Error("mcp connection failed",
10299
"transport", server.Transport,
103100
"url", server.URL,
104101
"error", err,
105102
)
106103
return nil, fmt.Errorf("failed to connect to MCP server '%s': %w", serverName, err)
107104
}
108105

109-
logger.Info("mcp connected successfully", "server_name", serverName)
106+
logger.Info("mcp connected")
110107
m.sessions[serverName] = session
111108
return session, nil
112109
}
113110

114111
// CallTool executes a single MCP tool call using the manager for session persistence.
115112
func (m *ClientManager) CallTool(ctx context.Context, server *protocol.MCPServerConfig, toolName string, arguments map[string]any, logger *slog.Logger) (*sdk_mcp.CallToolResult, error) {
116113
if logger == nil {
117-
logger = slog.Default()
114+
logger = slog.With("server", server.Name, "tool", toolName)
118115
}
119116

120-
logger.Debug("mcp call tool",
121-
"server_name", server.Name,
122-
"tool_name", toolName,
123-
"arguments", arguments,
124-
)
117+
logger.Debug("mcp preparing tool call", "arguments", arguments)
125118

126119
session, err := m.GetSession(ctx, server, logger)
127120
if err != nil {
@@ -131,34 +124,22 @@ func (m *ClientManager) CallTool(ctx context.Context, server *protocol.MCPServer
131124
callCtx, callCancel := context.WithTimeout(ctx, DefaultCallTimeout)
132125
defer callCancel()
133126

134-
logger.Debug("mcp calling tool", "server_name", server.Name, "tool_name", toolName, "timeout", DefaultCallTimeout)
127+
logger.Debug("mcp invoking tool", "timeout", DefaultCallTimeout)
135128

136129
result, err := session.CallTool(callCtx, &sdk_mcp.CallToolParams{
137130
Name: toolName,
138131
Arguments: arguments,
139132
})
140133
if err != nil {
141-
logger.Error("mcp call tool failed",
142-
"server_name", server.Name,
143-
"tool_name", toolName,
144-
"error", err,
145-
)
134+
logger.Error("mcp tool call failed", "error", err)
146135
m.invalidateSession(server.Name)
147136
return nil, fmt.Errorf("failed to call tool '%s': %w", toolName, err)
148137
}
149138

150139
if result.IsError {
151-
errorContent := ExtractContent(result)
152-
logger.Warn("mcp tool returned error",
153-
"server_name", server.Name,
154-
"tool_name", toolName,
155-
"error_content", errorContent,
156-
)
140+
logger.Warn("mcp tool returned error", "error_content", ExtractContent(result))
157141
} else {
158-
logger.Info("mcp call tool success",
159-
"server_name", server.Name,
160-
"tool_name", toolName,
161-
)
142+
logger.Info("mcp tool call succeeded")
162143
}
163144

164145
return result, nil

mcp/transport.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,23 @@ func (t *headerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
7070
req.Header.Set(k, v)
7171
}
7272

73+
// Reject standalone SSE GET immediately. Per MCP spec §2.2.3, servers
74+
// that don't support standalone SSE streams MUST return 405. Some servers
75+
// ignore the GET instead, causing the SDK to retry for 50+ seconds. We
76+
// short-circuit with a synthetic 405 so Connect() completes quickly.
77+
if req.Method == http.MethodGet && strings.Contains(req.Header.Get("Accept"), "text/event-stream") {
78+
slog.Debug("mcp blocking standalone SSE GET",
79+
"url", req.URL.String(),
80+
)
81+
return &http.Response{
82+
StatusCode: http.StatusMethodNotAllowed,
83+
Status: "405 Method Not Allowed",
84+
Header: http.Header{},
85+
Body: http.NoBody,
86+
Request: req,
87+
}, nil
88+
}
89+
7390
slog.Debug("mcp http request",
7491
"method", req.Method,
7592
"url", req.URL.String(),

ws/client.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ func (c *Client) RunWithReconnect(ctx context.Context) error {
210210

211211
slog.Warn("connection failed, retrying",
212212
"attempt", attempt,
213+
"max_attempts", maxReconnectAttempts,
213214
"delay", delay,
214215
"error", err,
215216
)
@@ -240,7 +241,8 @@ func (c *Client) RunWithReconnect(ctx context.Context) error {
240241

241242
// Run (blocking until disconnect)
242243
if err := c.Run(ctx); err != nil {
243-
slog.Warn("connection lost",
244+
slog.Warn("connection lost, will reconnect",
245+
"worknode_id", c.worknodeID,
244246
"error", err,
245247
)
246248
}
@@ -356,14 +358,15 @@ func (c *Client) readLoop(ctx context.Context) error {
356358
if err := json.Unmarshal(data, &msg); err != nil {
357359
slog.Warn("failed to unmarshal message",
358360
"error", err,
361+
"raw_size", len(data),
359362
)
360363
continue
361364
}
362365

363366
// Handle message in goroutine to not block read loop
364367
go func(m protocol.Message) {
365368
if err := c.handler(ctx, &m); err != nil {
366-
slog.Error("failed to handle message",
369+
slog.Error("message handling failed",
367370
"type", m.Type,
368371
"error", err,
369372
)
@@ -387,20 +390,23 @@ func (c *Client) sendLoop(ctx context.Context) {
387390
c.mu.Unlock()
388391

389392
if conn == nil {
393+
slog.Warn("dropping message, not connected", "type", msg.Type)
390394
continue
391395
}
392396

393397
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
394398
data, err := json.Marshal(msg)
395399
if err != nil {
396-
slog.Error("failed to marshal message",
400+
slog.Error("failed to marshal outgoing message",
401+
"type", msg.Type,
397402
"error", err,
398403
)
399404
continue
400405
}
401406

402407
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
403-
slog.Error("failed to send message",
408+
slog.Error("failed to write message",
409+
"type", msg.Type,
404410
"error", err,
405411
)
406412
}

ws/handler.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,15 @@ func (h *Handler) executeTask(ctx context.Context, req *protocol.TaskRequestPayl
150150
if err != nil {
151151
return nil, fmt.Errorf("invalid read args: %w", err)
152152
}
153-
logger.Info("executing task", "path", args.Path)
153+
logger.Info("reading file", "path", args.Path)
154154
return h.ws.Read(ctx, args)
155155

156156
case protocol.TaskOpWrite:
157157
args, err := parseArgs[protocol.WriteArgs](req.Args)
158158
if err != nil {
159159
return nil, fmt.Errorf("invalid write args: %w", err)
160160
}
161-
logger.Info("executing task", "path", args.Path)
161+
logger.Info("writing file", "path", args.Path)
162162
if err := h.ws.Write(ctx, args); err != nil {
163163
return nil, err
164164
}
@@ -169,50 +169,50 @@ func (h *Handler) executeTask(ctx context.Context, req *protocol.TaskRequestPayl
169169
if err != nil {
170170
return nil, fmt.Errorf("invalid list args: %w", err)
171171
}
172-
logger.Info("executing task", "path", args.Path, "recursive", args.Recursive)
172+
logger.Info("listing directory", "path", args.Path, "recursive", args.Recursive)
173173
return h.ws.List(ctx, args)
174174

175175
case protocol.TaskOpGlob:
176176
args, err := parseArgs[protocol.GlobArgs](req.Args)
177177
if err != nil {
178178
return nil, fmt.Errorf("invalid glob args: %w", err)
179179
}
180-
logger.Info("executing task", "pattern", args.Pattern)
180+
logger.Info("globbing files", "pattern", args.Pattern)
181181
return h.ws.Glob(ctx, args)
182182

183183
case protocol.TaskOpGrep:
184184
args, err := parseArgs[protocol.GrepArgs](req.Args)
185185
if err != nil {
186186
return nil, fmt.Errorf("invalid grep args: %w", err)
187187
}
188-
logger.Info("executing task", "pattern", args.Pattern, "include", args.Include)
188+
logger.Info("grepping content", "pattern", args.Pattern, "include", args.Include)
189189
return h.ws.Grep(ctx, args)
190190

191191
case protocol.TaskOpBash:
192192
args, err := parseArgs[protocol.BashArgs](req.Args)
193193
if err != nil {
194194
return nil, fmt.Errorf("invalid bash args: %w", err)
195195
}
196-
logger.Info("executing task", "command", args.Command, "workdir", args.Workdir)
196+
logger.Info("executing command", "command", args.Command, "workdir", args.Workdir)
197197
return h.ws.Bash(ctx, args)
198198

199199
case protocol.TaskOpWebFetch:
200200
args, err := parseArgs[protocol.WebFetchArgs](req.Args)
201201
if err != nil {
202202
return nil, fmt.Errorf("invalid webfetch args: %w", err)
203203
}
204-
logger.Info("executing task", "url", args.URL, "format", args.Format)
204+
logger.Info("fetching url", "url", args.URL, "format", args.Format)
205205
return h.ws.WebFetch(ctx, args)
206206

207207
case protocol.TaskOpMCPCall:
208208
args, err := parseArgs[protocol.MCPCallArgs](req.Args)
209209
if err != nil {
210210
return nil, fmt.Errorf("invalid mcp_call args: %w", err)
211211
}
212-
logger.Info("executing task",
213-
"server_name", args.Server.Name,
214-
"server_transport", args.Server.Transport,
215-
"tool_name", args.ToolName,
212+
logger.Info("calling mcp tool",
213+
"server", args.Server.Name,
214+
"transport", args.Server.Transport,
215+
"tool", args.ToolName,
216216
)
217217
return h.ws.MCPCall(ctx, args, logger)
218218

@@ -221,9 +221,9 @@ func (h *Handler) executeTask(ctx context.Context, req *protocol.TaskRequestPayl
221221
if err != nil {
222222
return nil, fmt.Errorf("invalid mcp_list_tools args: %w", err)
223223
}
224-
logger.Info("executing task",
225-
"server_name", args.Server.Name,
226-
"server_transport", args.Server.Transport,
224+
logger.Info("listing mcp tools",
225+
"server", args.Server.Name,
226+
"transport", args.Server.Transport,
227227
)
228228
return h.ws.MCPListTools(ctx, args, logger)
229229

@@ -232,7 +232,7 @@ func (h *Handler) executeTask(ctx context.Context, req *protocol.TaskRequestPayl
232232
if err != nil {
233233
return nil, fmt.Errorf("invalid sync_skill args: %w", err)
234234
}
235-
logger.Info("executing task", "skill_name", args.SkillName, "skill_dir", args.SkillDir)
235+
logger.Info("syncing skill", "skill", args.SkillName, "dir", args.SkillDir)
236236
return h.ws.SyncSkill(ctx, args)
237237

238238
default:
@@ -281,8 +281,10 @@ func (h *Handler) handleTaskCancel(ctx context.Context, msg *protocol.Message) e
281281
h.mu.RUnlock()
282282

283283
if ok {
284-
slog.Info("cancelling task", "task_id", payload.TaskID)
284+
slog.Info("cancelling task per request", "task_id", payload.TaskID)
285285
cancel()
286+
} else {
287+
slog.Debug("cancel requested for completed or unknown task", "task_id", payload.TaskID)
286288
}
287289
return nil
288290
}
@@ -293,17 +295,27 @@ func (h *Handler) handleMCPCall(ctx context.Context, msg *protocol.Message) erro
293295
return fmt.Errorf("failed to unmarshal mcp call: %w", err)
294296
}
295297

296-
logger := slog.With("call_id", payload.CallID)
298+
logger := slog.With(
299+
"call_id", payload.CallID,
300+
"server", payload.Server.Name,
301+
"tool", payload.ToolName,
302+
)
303+
304+
logger.Info("handling mcp call")
297305

306+
h.taskWg.Add(1)
298307
go func() {
308+
defer h.taskWg.Done()
299309
result, err := h.ws.MCPCall(ctx, &protocol.MCPCallArgs{
300310
Server: payload.Server,
301311
ToolName: payload.ToolName,
302312
Args: payload.Arguments,
303313
}, logger)
304314
if err != nil {
315+
logger.Error("mcp call failed", "error", err)
305316
h.sendMCPResult(payload.CallID, false, nil, err.Error())
306317
} else {
318+
logger.Info("mcp call completed")
307319
h.sendMCPResult(payload.CallID, true, result, "")
308320
}
309321
}()

0 commit comments

Comments
 (0)