Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,29 @@ type ActualState struct {
}

type Agent struct {
state AgentState
stateMutex sync.RWMutex
Client *agenthttp.Client
Reconciler *reconcile.Reconciler
Config *Config
PublicIP string
PrivateIP string
DataDir string
expectedState *agenthttp.ExpectedState
processingStart time.Time
LogCollector *logs.Collector
TraefikLogCollector *logs.TraefikCollector
Builder *build.Builder
isBuilding bool
buildMutex sync.Mutex
currentBuildID string
IsProxy bool
dnsInSync bool
DisableDNS bool
state AgentState
stateMutex sync.RWMutex
reconcileRequested chan struct{}
statusReportRequested chan string
refreshMutex sync.Mutex
pendingExpectedStateRefresh bool
Client *agenthttp.Client
Reconciler *reconcile.Reconciler
Config *Config
PublicIP string
PrivateIP string
DataDir string
expectedState *agenthttp.ExpectedState
processingStart time.Time
LogCollector *logs.Collector
TraefikLogCollector *logs.TraefikCollector
Builder *build.Builder
isBuilding bool
buildMutex sync.Mutex
currentBuildID string
IsProxy bool
dnsInSync bool
DisableDNS bool
}

func NewAgent(
Expand All @@ -92,18 +96,20 @@ func NewAgent(
disableDNS bool,
) *Agent {
return &Agent{
state: StateIdle,
Client: client,
Reconciler: reconciler,
Config: config,
PublicIP: publicIP,
PrivateIP: privateIP,
DataDir: dataDir,
LogCollector: logCollector,
TraefikLogCollector: traefikLogCollector,
Builder: builder,
IsProxy: isProxy,
DisableDNS: disableDNS,
state: StateIdle,
reconcileRequested: make(chan struct{}, 1),
statusReportRequested: make(chan string, 1),
Client: client,
Reconciler: reconciler,
Config: config,
PublicIP: publicIP,
PrivateIP: privateIP,
DataDir: dataDir,
LogCollector: logCollector,
TraefikLogCollector: traefikLogCollector,
Builder: builder,
IsProxy: isProxy,
DisableDNS: disableDNS,
}
}

Expand Down
52 changes: 48 additions & 4 deletions agent/internal/agent/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,48 @@ func (a *Agent) Tick() {
}
}

func (a *Agent) RequestReconcile(reason string) {
if a.GetState() == StateProcessing {
a.requestExpectedStateRefresh()
log.Printf("[reconcile] refresh requested during processing: %s", reason)
} else {
log.Printf("[reconcile] immediate reconcile requested: %s", reason)
}

select {
case a.reconcileRequested <- struct{}{}:
default:
}
}

func (a *Agent) requestExpectedStateRefresh() {
a.refreshMutex.Lock()
defer a.refreshMutex.Unlock()
a.pendingExpectedStateRefresh = true
}

func (a *Agent) consumeExpectedStateRefresh() bool {
a.refreshMutex.Lock()
defer a.refreshMutex.Unlock()

if !a.pendingExpectedStateRefresh {
return false
}

a.pendingExpectedStateRefresh = false
return true
}

func (a *Agent) transitionToIdle() {
a.SetState(StateIdle)
if a.consumeExpectedStateRefresh() {
log.Printf("[processing] fetching latest expected state after pending refresh")
// A deploy wake can arrive while processing a previous snapshot. Run one
// immediate idle pass after processing to pick up the latest expected state.
a.handleIdle()
}
}

func (a *Agent) handleIdle() {
expected, fromCache, err := a.Client.GetExpectedStateWithFallback()
if err != nil {
Expand Down Expand Up @@ -61,31 +103,33 @@ func (a *Agent) handleIdle() {
func (a *Agent) handleProcessing() {
if time.Since(a.processingStart) > ProcessingTimeout {
log.Printf("[processing] timeout after %v, forcing transition to IDLE", ProcessingTimeout)
a.SetState(StateIdle)
a.transitionToIdle()
return
}

actual, err := a.getActualState()
if err != nil {
log.Printf("[processing] failed to get actual state: %v", err)
a.SetState(StateIdle)
a.transitionToIdle()
return
}

a.updateDnsInSync(a.expectedState, actual)

if len(a.detectChanges(a.expectedState, actual)) == 0 {
log.Printf("[processing] state converged, transitioning to IDLE")
a.SetState(StateIdle)
a.transitionToIdle()
return
}

err = a.reconcileOne(actual)
if err != nil {
log.Printf("[processing] reconciliation failed: %v, transitioning to IDLE", err)
a.SetState(StateIdle)
a.transitionToIdle()
return
}

a.RequestStatusReport("reconcile completed")
}

func (a *Agent) updateDnsInSync(expected *agenthttp.ExpectedState, actual *ActualState) {
Expand Down
37 changes: 29 additions & 8 deletions agent/internal/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func (a *Agent) Run(ctx context.Context) {
return
case <-ticker.C:
a.Tick()
case <-a.reconcileRequested:
if a.GetState() == StateProcessing {
a.requestExpectedStateRefresh()
} else {
a.consumeExpectedStateRefresh()
}
a.Tick()
case <-logTickerC:
a.CollectLogs()
case <-cleanupTickerC:
Expand All @@ -65,10 +72,7 @@ func (a *Agent) Run(ctx context.Context) {
}

func (a *Agent) StatusReportLoop(ctx context.Context) {
report := a.BuildStatusReport(true)
if err := a.Client.ReportStatus(report); err != nil {
log.Printf("[status] failed to report: %v", err)
}
a.reportStatus("startup")

ticker := time.NewTicker(WorkQueueStatusInterval)
defer ticker.Stop()
Expand All @@ -78,14 +82,31 @@ func (a *Agent) StatusReportLoop(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
report := a.BuildStatusReport(true)
if err := a.Client.ReportStatus(report); err != nil {
log.Printf("[status] failed to report: %v", err)
}
a.reportStatus("periodic")
case reason := <-a.statusReportRequested:
a.reportStatus(reason)
}
}
}

func (a *Agent) RequestStatusReport(reason string) {
log.Printf("[status] immediate report requested: %s", reason)
select {
case a.statusReportRequested <- reason:
default:
log.Printf("[status] immediate report already queued, dropping reason: %s", reason)
}
}

func (a *Agent) reportStatus(reason string) {
report := a.BuildStatusReport(true)
if err := a.Client.ReportStatus(report); err != nil {
log.Printf("[status] failed to report (%s): %v", reason, err)
return
}
log.Printf("[status] reported (%s)", reason)
}

func (a *Agent) WorkQueueLoop(ctx context.Context) {
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion agent/internal/agent/workqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (a *Agent) ProcessWorkQueue() {
case "stop":
processErr = a.ProcessStop(item)
case "deploy":
log.Printf("[work-queue] deploy handled via expected state reconciliation, marking complete")
a.RequestReconcile("deploy work item " + Truncate(item.ID, 8))
case "force_cleanup":
processErr = a.ProcessForceCleanup(item)
case "cleanup_volumes":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,31 @@
"use client";

import { useState } from "react";
import { toast } from "sonner";
import {
AlertTriangleIcon,
ChevronDownIcon,
PlayIcon,
RefreshCwIcon,
RotateCcwIcon,
PlayIcon,
StopCircleIcon,
Trash2Icon,
AlertTriangleIcon,
} from "lucide-react";
import { useState } from "react";
import { toast } from "sonner";
import { useSWRConfig } from "swr";
import {
deleteDeployments,
deployService,
restartService,
stopService,
} from "@/actions/projects";
import { useService } from "@/components/service/service-layout-client";
import { DeploymentCanvas } from "@/components/service/details/deployment-canvas";
import {
DeploymentProgress,
getBarState,
} from "@/components/service/details/deployment-progress";
import { PendingChangesBanner } from "@/components/service/details/pending-changes-banner";
import { RolloutHistory } from "@/components/service/details/rollout-history";
import { Button } from "@/components/ui/button";
import { ButtonGroup } from "@/components/ui/button-group";
import {
DropdownMenu,
DropdownMenuContent,
DropdownMenuItem,
DropdownMenuSeparator,
DropdownMenuTrigger,
} from "@/components/ui/dropdown-menu";
import { useService } from "@/components/service/service-layout-client";
import {
AlertDialog,
AlertDialogAction,
Expand All @@ -45,12 +37,22 @@ import {
AlertDialogMedia,
AlertDialogTitle,
} from "@/components/ui/alert-dialog";
import { Button } from "@/components/ui/button";
import { ButtonGroup } from "@/components/ui/button-group";
import {
DropdownMenu,
DropdownMenuContent,
DropdownMenuItem,
DropdownMenuSeparator,
DropdownMenuTrigger,
} from "@/components/ui/dropdown-menu";

type ConfirmAction = "redeploy" | "stop" | "delete" | null;

export default function DeploymentsPage() {
const { service, pendingChanges, projectSlug, envName, onUpdate } =
useService();
const { mutate: mutateCache } = useSWRConfig();
const [isLoading, setIsLoading] = useState<string | null>(null);
const [confirmAction, setConfirmAction] = useState<ConfirmAction>(null);

Expand All @@ -68,6 +70,9 @@ export default function DeploymentsPage() {
if (successMessage) {
toast.success(successMessage);
}
if (actionName === "redeploy" || actionName === "start") {
await mutateCache(`/api/services/${service.id}/rollouts`);
}
onUpdate();
} catch (error) {
const errorMessage =
Expand Down
2 changes: 0 additions & 2 deletions web/app/api/inngest/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
buildTriggerWorkflow,
backupTriggerWorkflow,
restoreTriggerWorkflow,
processAgentStatus,
} from "@/lib/inngest/functions";

export const { GET, POST, PUT } = serve({
Expand All @@ -43,6 +42,5 @@ export const { GET, POST, PUT } = serve({
buildTriggerWorkflow,
backupTriggerWorkflow,
restoreTriggerWorkflow,
processAgentStatus,
],
});
21 changes: 3 additions & 18 deletions web/app/api/v1/agent/status/route.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { NextRequest, NextResponse } from "next/server";
import { db } from "@/db";
import { servers } from "@/db/schema";
import { eq } from "drizzle-orm";
import { verifyAgentRequest } from "@/lib/agent-auth";
import { inngest } from "@/lib/inngest/client";
import type { StatusReport } from "@/lib/agent-status";
import { applyStatusReport, type StatusReport } from "@/lib/agent-status";

export async function POST(request: NextRequest) {
const body = await request.text();
Expand All @@ -29,18 +25,7 @@ export async function POST(request: NextRequest) {

const { serverId } = auth;

await db
.update(servers)
.set({ lastHeartbeat: new Date(), status: "online" })
.where(eq(servers.id, serverId));
await applyStatusReport(serverId, data.statusReport);

await inngest.send({
name: "agent/status-reported",
data: {
serverId,
report: data.statusReport,
},
});

return NextResponse.json({ ok: true }, { status: 202 });
return NextResponse.json({ ok: true });
}
5 changes: 4 additions & 1 deletion web/app/api/v1/agent/work-queue/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { eq, and, inArray } from "drizzle-orm";
import { verifyAgentRequest } from "@/lib/agent-auth";

const MAX_TIMEOUT = 30000;
const POLL_INTERVAL = 2000;
// Short interval keeps deploy wake latency low for now, but increases empty
// polling load. Replace with Postgres LISTEN/NOTIFY or another wake mechanism
// before scaling this broadly.
const POLL_INTERVAL = 500;

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
Expand Down
Loading