diff --git a/agent_client.go b/agent_client.go index d74bdaae..16f3ff7b 100644 --- a/agent_client.go +++ b/agent_client.go @@ -15,9 +15,11 @@ import ( type AgentClient struct { agentClient livekit.CloudAgent authBase + httpClient *http.Client + twirpOpts []twirp.ClientOption } -func NewAgentClient(url string, apiKey string, apiSecret string, opts ...twirp.ClientOption) (*AgentClient, error) { +func NewAgentClient(url string, apiKey string, apiSecret string, opts ...AgentClientOption) (*AgentClient, error) { serverUrl := os.Getenv("LK_AGENTS_URL") if serverUrl == "" { url = signalling.ToHttpURL(url) @@ -25,12 +27,29 @@ func NewAgentClient(url string, apiKey string, apiSecret string, opts ...twirp.C re := regexp.MustCompile(pattern) serverUrl = re.ReplaceAllString(url, "https://agents.") } + c := &AgentClient{ + authBase: authBase{apiKey, apiSecret}, + httpClient: &http.Client{}, + } + for _, opt := range opts { + opt(c) + } + c.agentClient = livekit.NewCloudAgentProtobufClient(serverUrl, c.httpClient, c.twirpOpts...) + return c, nil +} + +type AgentClientOption func(*AgentClient) + +func WithHTTPClient(httpClient *http.Client) AgentClientOption { + return func(c *AgentClient) { + c.httpClient = httpClient + } +} - client := livekit.NewCloudAgentProtobufClient(serverUrl, &http.Client{}, opts...) - return &AgentClient{ - agentClient: client, - authBase: authBase{apiKey, apiSecret}, - }, nil +func WithTwirpClientOptions(opts ...twirp.ClientOption) AgentClientOption { + return func(c *AgentClient) { + c.twirpOpts = opts + } } func (c *AgentClient) CreateAgent(ctx context.Context, req *livekit.CreateAgentRequest) (*livekit.CreateAgentResponse, error) { @@ -41,6 +60,14 @@ func (c *AgentClient) CreateAgent(ctx context.Context, req *livekit.CreateAgentR return c.agentClient.CreateAgent(ctx, req) } +func (c *AgentClient) CreateAgentV2(ctx context.Context, req *livekit.CreateAgentV2Request) (*livekit.CreateAgentV2Response, error) { + ctx, err := c.withAuth(ctx, withAgentGrant{Admin: true}) + if err != nil { + return nil, err + } + return c.agentClient.CreateAgentV2(ctx, req) +} + func (c *AgentClient) ListAgents(ctx context.Context, req *livekit.ListAgentsRequest) (*livekit.ListAgentsResponse, error) { ctx, err := c.withAuth(ctx, withAgentGrant{Admin: true}) if err != nil { @@ -113,6 +140,14 @@ func (c *AgentClient) DeployAgent(ctx context.Context, req *livekit.DeployAgentR return c.agentClient.DeployAgent(ctx, req) } +func (c *AgentClient) DeployAgentV2(ctx context.Context, req *livekit.DeployAgentV2Request) (*livekit.DeployAgentV2Response, error) { + ctx, err := c.withAuth(ctx, withAgentGrant{Admin: true}) + if err != nil { + return nil, err + } + return c.agentClient.DeployAgentV2(ctx, req) +} + func (c *AgentClient) GetClientSettings(ctx context.Context, req *livekit.ClientSettingsRequest) (*livekit.ClientSettingsResponse, error) { ctx, err := c.withAuth(ctx, withAgentGrant{Admin: true}) if err != nil { diff --git a/go.mod b/go.mod index 503b942b..06633258 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20260401192012-ea94ab340a57 github.com/livekit/mediatransportutil v0.0.0-20251128105421-19c7a7b81c22 - github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221 + github.com/livekit/protocol v1.45.6 github.com/magefile/mage v1.17.0 github.com/moby/buildkit v0.26.2 github.com/moby/patternmatcher v0.6.1 diff --git a/go.sum b/go.sum index 63c8c4ec..980297c8 100644 --- a/go.sum +++ b/go.sum @@ -159,8 +159,8 @@ github.com/livekit/media-sdk v0.0.0-20260401192012-ea94ab340a57 h1:Y0dZHH9gY70h+ github.com/livekit/media-sdk v0.0.0-20260401192012-ea94ab340a57/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8= github.com/livekit/mediatransportutil v0.0.0-20251128105421-19c7a7b81c22 h1:dzCBxOGLLWVtQhL7OYK2EGN+5Q+23Mq/jfz4vQisirA= github.com/livekit/mediatransportutil v0.0.0-20251128105421-19c7a7b81c22/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221 h1:loe7h+z1kOu/ojprFTYSZBbJVly7gdZgQ/ewElGeLPo= -github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221/go.mod h1:e6QdWDkfot+M2nRh0eitJUS0ZLuwvKCsfiz2pWWSG3s= +github.com/livekit/protocol v1.45.6 h1:E+wKxs8ckKNYYTNyHm5nR1ShGLJ5DmA+WCEb5AJG11A= +github.com/livekit/protocol v1.45.6/go.mod h1:e6QdWDkfot+M2nRh0eitJUS0ZLuwvKCsfiz2pWWSG3s= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94= diff --git a/pkg/cloudagents/build.go b/pkg/cloudagents/build.go index 84360820..91d3ce7a 100644 --- a/pkg/cloudagents/build.go +++ b/pkg/cloudagents/build.go @@ -30,9 +30,12 @@ import ( "golang.org/x/sync/errgroup" ) -func (c *Client) build(ctx context.Context, id string, writer io.Writer) error { +func (c *Client) build(ctx context.Context, id string, environment string, writer io.Writer) error { params := url.Values{} params.Add("agent_id", id) + if environment != "" { + params.Add("environment", environment) + } fullUrl := fmt.Sprintf("%s/build?%s", c.agentsURL, params.Encode()) req, err := c.newRequestWithContext(ctx, "POST", fullUrl, nil) if err != nil { diff --git a/pkg/cloudagents/client.go b/pkg/cloudagents/client.go index 47cd3e7d..4135960e 100644 --- a/pkg/cloudagents/client.go +++ b/pkg/cloudagents/client.go @@ -55,12 +55,14 @@ func New(opts ...ClientOption) (*Client, error) { if client.projectURL == "" { return nil, fmt.Errorf("project credentials are required") } - agentClient, err := lksdk.NewAgentClient(client.projectURL, client.apiKey, client.apiSecret, twirp.WithClientHooks(&twirp.ClientHooks{ - RequestPrepared: func(ctx context.Context, req *http.Request) (context.Context, error) { - client.setLivekitHeaders(req) - return ctx, nil - }, - })) + agentClient, err := lksdk.NewAgentClient(client.projectURL, client.apiKey, client.apiSecret, + lksdk.WithTwirpClientOptions( + twirp.WithClientHooks(&twirp.ClientHooks{ + RequestPrepared: func(ctx context.Context, req *http.Request) (context.Context, error) { + client.setLivekitHeaders(req) + return ctx, nil + }, + }))) if err != nil { return nil, err } @@ -93,6 +95,7 @@ func (c *Client) CreateAgent( resp.PresignedUrl, resp.PresignedPostRequest, source, + "", // production (create always targets production) excludeFiles, buildLogStreamWriter, ); err != nil { @@ -101,6 +104,21 @@ func (c *Client) CreateAgent( return resp, nil } +func (c *Client) CreateAgentV2( + ctx context.Context, + secrets []*lkproto.AgentSecret, + regions []string, +) (*lkproto.CreateAgentV2Response, error) { + resp, err := c.AgentClient.CreateAgentV2(ctx, &lkproto.CreateAgentV2Request{ + Secrets: secrets, + Regions: regions, + }) + if err != nil { + return nil, err + } + return resp, nil +} + // DeployAgent deploys new agent by building from source. func (c *Client) DeployAgent( ctx context.Context, @@ -120,7 +138,30 @@ func (c *Client) DeployAgent( if !resp.Success { return fmt.Errorf("failed to deploy agent: %s", resp.Message) } - return c.uploadAndBuild(ctx, agentID, resp.PresignedUrl, resp.PresignedPostRequest, source, excludeFiles, buildLogStreamWriter) + return c.uploadAndBuild(ctx, agentID, resp.PresignedUrl, resp.PresignedPostRequest, source, "", excludeFiles, buildLogStreamWriter) +} + +func (c *Client) DeployAgentV2( + ctx context.Context, + agentID string, + source fs.FS, + secrets []*lkproto.AgentSecret, + environment string, + excludeFiles []string, + buildLogStreamWriter io.Writer, +) error { + resp, err := c.AgentClient.DeployAgentV2(ctx, &lkproto.DeployAgentV2Request{ + AgentId: agentID, + Secrets: secrets, + Environment: environment, + }) + if err != nil { + return err + } + if !resp.Success { + return fmt.Errorf("failed to deploy agent: %s", resp.Message) + } + return c.uploadAndBuild(ctx, agentID, "", resp.PresignedReq, source, environment, excludeFiles, buildLogStreamWriter) } // RegisterAgent creates an agent record without uploading source or triggering a build. @@ -158,6 +199,7 @@ func (c *Client) uploadAndBuild( presignedUrl string, presignedPostRequest *lkproto.PresignedPostRequest, source fs.FS, + environment string, excludeFiles []string, buildLogStreamWriter io.Writer, ) error { @@ -169,7 +211,7 @@ func (c *Client) uploadAndBuild( ); err != nil { return err } - if err := c.build(ctx, agentID, buildLogStreamWriter); err != nil { + if err := c.build(ctx, agentID, environment, buildLogStreamWriter); err != nil { return err } return nil diff --git a/pkg/cloudagents/logs.go b/pkg/cloudagents/logs.go index 70455379..d05ec714 100644 --- a/pkg/cloudagents/logs.go +++ b/pkg/cloudagents/logs.go @@ -30,7 +30,7 @@ type APIError struct { } // StreamLogs streams the logs for the given agent. -func (c *Client) StreamLogs(ctx context.Context, logType, agentID string, writer io.Writer, serverRegion string) error { +func (c *Client) StreamLogs(ctx context.Context, logType, agentID, environment string, writer io.Writer, serverRegion string) error { logger := c.logger.WithName("StreamLogs") if logType == "" { logType = "deploy" @@ -38,6 +38,9 @@ func (c *Client) StreamLogs(ctx context.Context, logType, agentID string, writer params := url.Values{} params.Add("agent_id", agentID) params.Add("log_type", logType) + if environment != "" { + params.Add("env", environment) + } fullUrl := fmt.Sprintf("%s/logs?%s", c.getAgentsURL(serverRegion), params.Encode()) req, err := c.newRequestWithContext(ctx, "GET", fullUrl, nil) if err != nil { diff --git a/pkg/cloudagents/logs_test.go b/pkg/cloudagents/logs_test.go index 286f1f08..f3b3adcb 100644 --- a/pkg/cloudagents/logs_test.go +++ b/pkg/cloudagents/logs_test.go @@ -43,7 +43,7 @@ func TestStreamLogs_WriterClosesEarly(t *testing.T) { pw.Close() }() - err := client.StreamLogs(context.Background(), "deploy", "test-agent", pw, "us-west") + err := client.StreamLogs(context.Background(), "deploy", "test-agent", "", pw, "us-west") if err == nil { t.Fatal("expected error when writer closes, got nil") @@ -81,7 +81,7 @@ func TestStreamLogs_ContextCanceledDuringWrite(t *testing.T) { cancel() }() - err := client.StreamLogs(ctx, "deploy", "test-agent", &buf, "us-west") + err := client.StreamLogs(ctx, "deploy", "test-agent", "", &buf, "us-west") if !errors.Is(err, context.Canceled) { t.Errorf("expected context.Canceled error, got: %v", err) @@ -119,7 +119,7 @@ func TestStreamLogs_WriterReturnsError(t *testing.T) { } writer := &failingWriter{failAfter: 2} - err := client.StreamLogs(context.Background(), "deploy", "test-agent", writer, "us-west") + err := client.StreamLogs(context.Background(), "deploy", "test-agent", "", writer, "us-west") if err == nil { t.Fatal("expected error from failing writer, got nil") @@ -147,7 +147,7 @@ func TestStreamLogs_NonOKResponse(t *testing.T) { projectURL: server.URL, } - err := client.StreamLogs(context.Background(), "deploy", "test-agent", &bytes.Buffer{}, "us-west") + err := client.StreamLogs(context.Background(), "deploy", "test-agent", "", &bytes.Buffer{}, "us-west") if err == nil { t.Fatal("expected error when server responds with non-200 status") } @@ -181,7 +181,7 @@ func TestStreamLogs_ServerClosesConnection(t *testing.T) { projectURL: server.URL, } - err := client.StreamLogs(context.Background(), "deploy", "test-agent", &bytes.Buffer{}, "us-west") + err := client.StreamLogs(context.Background(), "deploy", "test-agent", "", &bytes.Buffer{}, "us-west") if err == nil { t.Fatal("expected error when server closes connection mid-stream") } diff --git a/pkg/cloudagents/tar.go b/pkg/cloudagents/tar.go index e73d1857..91552f38 100644 --- a/pkg/cloudagents/tar.go +++ b/pkg/cloudagents/tar.go @@ -95,7 +95,6 @@ func CreateSourceTarball( } if !checkFilesToInclude(matcher, path) { - logger.Debugw("excluding file from tarball", "path", path) return nil }