Skip to content

Commit 22cfe41

Browse files
authored
Add --json flag to tq CLI for programmatic queue inspection (#9)
* Add --json flag to tq CLI for programmatic queue inspection Enable agents and status lines to read task queue state by adding structured JSON output to list, logs, and clear commands. The command field is now stored in the queue table and included in JSON output, allowing consumers to see what's actually running (e.g., "./gradlew build"). Changes: - Add --json flag to tq list, logs, and clear subcommands - Add command column to queue schema with migration for existing DBs - Add TestJsonSchemaContracts test class to enforce JSON structure stability - JSON mode for clear skips interactive confirmation (implies --force) * Address PR review feedback - Fix --json help text: say "skip confirmation" instead of "implies --force" - Fix test_list_json_empty_queue to actually test DB-exists-but-empty case
1 parent 756095d commit 22cfe41

3 files changed

Lines changed: 325 additions & 21 deletions

File tree

queue_core.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths":
5252
pid INTEGER,
5353
server_id TEXT,
5454
child_pid INTEGER,
55+
command TEXT,
5556
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
5657
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
5758
)
@@ -62,6 +63,11 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths":
6263
ALTER TABLE queue ADD COLUMN server_id TEXT
6364
"""
6465

66+
# Migration to add command column to existing databases
67+
QUEUE_MIGRATION_COMMAND = """
68+
ALTER TABLE queue ADD COLUMN command TEXT
69+
"""
70+
6571
QUEUE_INDEX = """
6672
CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(queue_name, status)
6773
"""
@@ -91,11 +97,12 @@ def init_db(paths: QueuePaths):
9197
with get_db(paths.db_path) as conn:
9298
conn.execute(QUEUE_SCHEMA)
9399
conn.execute(QUEUE_INDEX)
94-
# Run migration for existing databases without server_id column
95-
try:
96-
conn.execute(QUEUE_MIGRATION_SERVER_ID)
97-
except sqlite3.OperationalError:
98-
pass # Column already exists
100+
# Run migrations for existing databases
101+
for migration in [QUEUE_MIGRATION_SERVER_ID, QUEUE_MIGRATION_COMMAND]:
102+
try:
103+
conn.execute(migration)
104+
except sqlite3.OperationalError:
105+
pass # Column already exists
99106

100107

101108
def ensure_db(paths: QueuePaths):

tests/test_tq_cli.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
"""
55

66
import json
7+
import os
8+
import signal
79
import subprocess
810
import sys
911
import tempfile
12+
import time
1013
from pathlib import Path
1114

1215
import pytest
@@ -153,6 +156,176 @@ def test_list_no_database(self, temp_data_dir):
153156
assert result.returncode == 0
154157
assert "No queue database" in result.stdout or "empty" in result.stdout.lower()
155158

159+
def test_list_json_empty_queue(self, temp_data_dir):
160+
"""Test list --json command with DB exists but queue is empty."""
161+
# Initialize DB by running a task that completes
162+
run_tq("echo", "init", data_dir=temp_data_dir)
163+
164+
result = run_tq("list", "--json", data_dir=temp_data_dir)
165+
166+
assert result.returncode == 0
167+
output = json.loads(result.stdout)
168+
assert output == {
169+
"tasks": [],
170+
"summary": {"total": 0, "running": 0, "waiting": 0}
171+
}
172+
173+
def test_list_json_no_database(self, temp_data_dir):
174+
"""Test list --json command when database doesn't exist."""
175+
result = run_tq("list", "--json", data_dir=temp_data_dir)
176+
177+
assert result.returncode == 0
178+
output = json.loads(result.stdout)
179+
assert output["tasks"] == []
180+
assert output["summary"]["total"] == 0
181+
182+
183+
class TestJsonSchemaContracts:
184+
"""
185+
Schema contract tests for JSON output.
186+
187+
These tests ensure the JSON structure remains stable for programmatic consumers
188+
(e.g., Claude Code status lines). Any changes to these schemas should be
189+
intentional and backward-compatible.
190+
"""
191+
192+
# Expected fields for each schema - used to enforce contracts
193+
LIST_REQUIRED_KEYS = {"tasks", "summary"}
194+
LIST_SUMMARY_REQUIRED_KEYS = {"total", "running", "waiting"}
195+
LIST_TASK_REQUIRED_KEYS = {"id", "queue_name", "status", "command", "pid", "child_pid", "created_at", "updated_at"}
196+
197+
LOGS_REQUIRED_KEYS = {"entries"}
198+
LOGS_ENTRY_REQUIRED_KEYS = {"event", "timestamp"} # Base keys all entries must have
199+
200+
CLEAR_REQUIRED_KEYS = {"cleared", "success"}
201+
202+
def test_list_json_schema_empty(self, temp_data_dir):
203+
"""Verify list --json schema structure when empty."""
204+
result = run_tq("list", "--json", data_dir=temp_data_dir)
205+
output = json.loads(result.stdout)
206+
207+
# Top-level keys
208+
assert set(output.keys()) == self.LIST_REQUIRED_KEYS, \
209+
f"list --json must have exactly keys {self.LIST_REQUIRED_KEYS}"
210+
211+
# Summary keys
212+
assert set(output["summary"].keys()) == self.LIST_SUMMARY_REQUIRED_KEYS, \
213+
f"list --json summary must have exactly keys {self.LIST_SUMMARY_REQUIRED_KEYS}"
214+
215+
# Type checks
216+
assert isinstance(output["tasks"], list)
217+
assert isinstance(output["summary"]["total"], int)
218+
assert isinstance(output["summary"]["running"], int)
219+
assert isinstance(output["summary"]["waiting"], int)
220+
221+
def test_list_json_schema_with_running_task(self, temp_data_dir):
222+
"""Verify list --json task schema with an active task."""
223+
# Start a long-running task
224+
proc = subprocess.Popen(
225+
[sys.executable, str(TQ_PATH), f"--data-dir={temp_data_dir}", "sleep", "30"],
226+
stdout=subprocess.PIPE,
227+
stderr=subprocess.PIPE,
228+
start_new_session=True,
229+
)
230+
231+
try:
232+
# Wait for it to start
233+
time.sleep(0.5)
234+
235+
result = run_tq("list", "--json", data_dir=temp_data_dir)
236+
output = json.loads(result.stdout)
237+
238+
# Verify structure
239+
assert set(output.keys()) == self.LIST_REQUIRED_KEYS
240+
assert len(output["tasks"]) >= 1
241+
242+
# Verify task object schema
243+
task = output["tasks"][0]
244+
assert set(task.keys()) == self.LIST_TASK_REQUIRED_KEYS, \
245+
f"Task object must have exactly keys {self.LIST_TASK_REQUIRED_KEYS}, got {set(task.keys())}"
246+
247+
# Verify task field types
248+
assert isinstance(task["id"], int)
249+
assert isinstance(task["queue_name"], str)
250+
assert task["status"] in ("running", "waiting")
251+
assert task["command"] is None or isinstance(task["command"], str)
252+
assert task["pid"] is None or isinstance(task["pid"], int)
253+
assert task["child_pid"] is None or isinstance(task["child_pid"], int)
254+
assert task["created_at"] is None or isinstance(task["created_at"], str)
255+
assert task["updated_at"] is None or isinstance(task["updated_at"], str)
256+
257+
# Verify command is populated for the running task
258+
assert task["command"] == "sleep 30", f"Expected command 'sleep 30', got {task['command']}"
259+
260+
# Verify summary counts are accurate
261+
assert output["summary"]["total"] == len(output["tasks"])
262+
running_count = sum(1 for t in output["tasks"] if t["status"] == "running")
263+
waiting_count = sum(1 for t in output["tasks"] if t["status"] == "waiting")
264+
assert output["summary"]["running"] == running_count
265+
assert output["summary"]["waiting"] == waiting_count
266+
267+
finally:
268+
# Clean up
269+
try:
270+
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
271+
except Exception:
272+
proc.terminate()
273+
proc.wait(timeout=5)
274+
275+
def test_logs_json_schema_empty(self, temp_data_dir):
276+
"""Verify logs --json schema structure when empty."""
277+
result = run_tq("logs", "--json", data_dir=temp_data_dir)
278+
output = json.loads(result.stdout)
279+
280+
assert set(output.keys()) == self.LOGS_REQUIRED_KEYS, \
281+
f"logs --json must have exactly keys {self.LOGS_REQUIRED_KEYS}"
282+
assert isinstance(output["entries"], list)
283+
284+
def test_logs_json_schema_with_entries(self, temp_data_dir):
285+
"""Verify logs --json entry schema with actual log entries."""
286+
# Generate some logs
287+
run_tq("echo", "test", data_dir=temp_data_dir)
288+
289+
result = run_tq("logs", "--json", data_dir=temp_data_dir)
290+
output = json.loads(result.stdout)
291+
292+
assert set(output.keys()) == self.LOGS_REQUIRED_KEYS
293+
assert len(output["entries"]) >= 3 # queued, started, completed
294+
295+
# Verify each entry has required base keys
296+
for entry in output["entries"]:
297+
assert self.LOGS_ENTRY_REQUIRED_KEYS.issubset(set(entry.keys())), \
298+
f"Log entry must have at least keys {self.LOGS_ENTRY_REQUIRED_KEYS}, got {set(entry.keys())}"
299+
assert isinstance(entry["event"], str)
300+
assert isinstance(entry["timestamp"], str)
301+
302+
# Verify specific event schemas
303+
for entry in output["entries"]:
304+
if entry["event"] == "task_queued":
305+
assert "task_id" in entry
306+
assert "queue_name" in entry
307+
elif entry["event"] == "task_started":
308+
assert "task_id" in entry
309+
assert "queue_name" in entry
310+
assert "wait_time_seconds" in entry
311+
elif entry["event"] == "task_completed":
312+
assert "task_id" in entry
313+
assert "queue_name" in entry
314+
assert "exit_code" in entry
315+
assert "duration_seconds" in entry
316+
317+
def test_clear_json_schema(self, temp_data_dir):
318+
"""Verify clear --json schema structure."""
319+
result = run_tq("clear", "--json", data_dir=temp_data_dir)
320+
output = json.loads(result.stdout)
321+
322+
assert set(output.keys()) == self.CLEAR_REQUIRED_KEYS, \
323+
f"clear --json must have exactly keys {self.CLEAR_REQUIRED_KEYS}"
324+
assert isinstance(output["cleared"], int)
325+
assert isinstance(output["success"], bool)
326+
assert output["cleared"] >= 0
327+
assert output["success"] is True
328+
156329

157330
class TestTqLogs:
158331
"""Tests for the tq logs command."""
@@ -189,6 +362,43 @@ def test_logs_n_option(self, temp_data_dir):
189362
lines = [line for line in result.stdout.strip().split("\n") if line]
190363
assert len(lines) == 3
191364

365+
def test_logs_json_no_file(self, temp_data_dir):
366+
"""Test logs --json command when no log file exists."""
367+
result = run_tq("logs", "--json", data_dir=temp_data_dir)
368+
369+
assert result.returncode == 0
370+
output = json.loads(result.stdout)
371+
assert output == {"entries": []}
372+
373+
def test_logs_json_shows_activity(self, temp_data_dir):
374+
"""Test logs --json command shows task activity."""
375+
# Run a task first to generate logs
376+
run_tq("echo", "test", data_dir=temp_data_dir)
377+
378+
result = run_tq("logs", "--json", data_dir=temp_data_dir)
379+
380+
assert result.returncode == 0
381+
output = json.loads(result.stdout)
382+
assert "entries" in output
383+
assert len(output["entries"]) >= 3 # queued, started, completed
384+
385+
events = [e["event"] for e in output["entries"]]
386+
assert "task_queued" in events
387+
assert "task_started" in events
388+
assert "task_completed" in events
389+
390+
def test_logs_json_n_option(self, temp_data_dir):
391+
"""Test logs --json -n option to limit entries."""
392+
# Run multiple tasks
393+
for i in range(5):
394+
run_tq("echo", f"test {i}", data_dir=temp_data_dir)
395+
396+
result = run_tq("logs", "--json", "-n", "3", data_dir=temp_data_dir)
397+
398+
assert result.returncode == 0
399+
output = json.loads(result.stdout)
400+
assert len(output["entries"]) == 3
401+
192402

193403
class TestTqClear:
194404
"""Tests for the tq clear command."""
@@ -203,6 +413,25 @@ def test_clear_empty_queue(self, temp_data_dir):
203413
assert result.returncode == 0
204414
assert "already empty" in result.stdout.lower()
205415

416+
def test_clear_json_empty_queue(self, temp_data_dir):
417+
"""Test clear --json command with empty queue."""
418+
# Initialize database by running a task that completes
419+
run_tq("echo", "init", data_dir=temp_data_dir)
420+
421+
result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5)
422+
423+
assert result.returncode == 0
424+
output = json.loads(result.stdout)
425+
assert output == {"cleared": 0, "success": True}
426+
427+
def test_clear_json_no_database(self, temp_data_dir):
428+
"""Test clear --json command when no database exists."""
429+
result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5)
430+
431+
assert result.returncode == 0
432+
output = json.loads(result.stdout)
433+
assert output == {"cleared": 0, "success": True}
434+
206435

207436
class TestTqHelp:
208437
"""Tests for help output."""

0 commit comments

Comments
 (0)