11package topology
22
33import (
4- // Import the Topology Service proto package to ensure the dependency is retained.
5- // The client implementation will be added in a subsequent MR.
4+ "context"
5+ "crypto/tls"
6+ "crypto/x509"
7+ "fmt"
8+ "os"
9+ "sync"
10+
11+ grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
612 pb "gitlab.com/gitlab-org/cells/topology-service/clients/go/proto"
13+ "gitlab.com/gitlab-org/labkit/correlation"
14+ grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
15+ "gitlab.com/gitlab-org/labkit/log"
16+ grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
17+ "google.golang.org/grpc"
18+ "google.golang.org/grpc/credentials"
19+ "google.golang.org/grpc/credentials/insecure"
720)
821
922// ClassifyType constants mirror the proto enum values for convenience.
@@ -12,3 +25,191 @@ const (
1225 ClassifyTypeSessionPrefix = pb .ClassifyType_SESSION_PREFIX
1326 ClassifyTypeCellID = pb .ClassifyType_CELL_ID
1427)
28+
29+ // Client provides a gRPC client for the Topology Service.
30+ // It handles connection management with lazy initialization and
31+ // supports TLS/mTLS for secure connections.
32+ type Client struct {
33+ config * Config
34+
35+ mu sync.Mutex
36+ conn * grpc.ClientConn
37+ client pb.ClassifyServiceClient
38+ }
39+
40+ // NewClient creates a new Topology Service client from the given configuration.
41+ // Returns nil if the topology service is disabled in the configuration.
42+ // The client uses lazy initialization - the actual gRPC connection is
43+ // established on the first call to Classify.
44+ // The configuration is copied to avoid mutating the original.
45+ func NewClient (cfg * Config ) * Client {
46+ if ! cfg .Enabled {
47+ return nil
48+ }
49+
50+ // Copy config to avoid mutating the original
51+ configCopy := * cfg
52+
53+ // Apply defaults
54+ if configCopy .Timeout == 0 {
55+ configCopy .Timeout = DefaultTimeout
56+ }
57+ if configCopy .ClassifyType == "" {
58+ configCopy .ClassifyType = "first_cell"
59+ }
60+
61+ return & Client {
62+ config : & configCopy ,
63+ }
64+ }
65+
66+ // Classify queries the Topology Service to determine which cell should handle
67+ // a request for the given value. The value interpretation depends on the
68+ // configured ClassifyType (e.g., project path, session prefix, cell ID).
69+ func (c * Client ) Classify (ctx context.Context , value string ) (* pb.ClassifyResponse , error ) {
70+ client , err := c .getClient (ctx )
71+ if err != nil {
72+ return nil , fmt .Errorf ("failed to get topology client: %w" , err )
73+ }
74+
75+ ctx , cancel := context .WithTimeout (ctx , c .config .Timeout )
76+ defer cancel ()
77+
78+ req := & pb.ClassifyRequest {
79+ Type : parseClassifyType (c .config .ClassifyType ),
80+ Value : value ,
81+ }
82+
83+ return client .Classify (ctx , req )
84+ }
85+
86+ // Close closes the gRPC connection to the Topology Service.
87+ // It is safe to call Close multiple times.
88+ func (c * Client ) Close () error {
89+ c .mu .Lock ()
90+ defer c .mu .Unlock ()
91+
92+ if c .conn == nil {
93+ return nil
94+ }
95+
96+ err := c .conn .Close ()
97+ c .conn = nil
98+ c .client = nil
99+ return err
100+ }
101+
102+ // getClient returns the ClassifyService client, establishing a connection if needed.
103+ // This implements lazy initialization - the connection is only created on first use.
104+ func (c * Client ) getClient (ctx context.Context ) (pb.ClassifyServiceClient , error ) {
105+ c .mu .Lock ()
106+ defer c .mu .Unlock ()
107+
108+ if c .client != nil {
109+ return c .client , nil
110+ }
111+
112+ conn , err := c .dial (ctx )
113+ if err != nil {
114+ return nil , err
115+ }
116+
117+ c .conn = conn
118+ c .client = pb .NewClassifyServiceClient (conn )
119+ return c .client , nil
120+ }
121+
122+ // dial establishes a gRPC connection to the Topology Service.
123+ func (c * Client ) dial (ctx context.Context ) (* grpc.ClientConn , error ) {
124+ serviceName := correlation .ExtractClientNameFromContext (ctx )
125+ if serviceName == "" {
126+ serviceName = "gitlab-shell-unknown"
127+
128+ log .WithContextFields (ctx , log.Fields {"service_name" : serviceName }).Warn ("No gRPC service name specified, defaulting to gitlab-shell-unknown" )
129+ }
130+ serviceName = fmt .Sprintf ("%s-%s" , serviceName , "topology" )
131+
132+ opts := []grpc.DialOption {
133+ grpc .WithChainStreamInterceptor (
134+ grpctracing .StreamClientTracingInterceptor (),
135+ grpc_prometheus .StreamClientInterceptor ,
136+ grpccorrelation .StreamClientCorrelationInterceptor (
137+ grpccorrelation .WithClientName (serviceName ),
138+ ),
139+ ),
140+ grpc .WithChainUnaryInterceptor (
141+ grpctracing .UnaryClientTracingInterceptor (),
142+ grpc_prometheus .UnaryClientInterceptor ,
143+ grpccorrelation .UnaryClientCorrelationInterceptor (
144+ grpccorrelation .WithClientName (serviceName ),
145+ ),
146+ ),
147+ }
148+
149+ creds , err := buildTLSCredentials (c .config )
150+ if err != nil {
151+ return nil , fmt .Errorf ("failed to build TLS credentials: %w" , err )
152+ }
153+
154+ if creds != nil {
155+ opts = append (opts , grpc .WithTransportCredentials (creds ))
156+ } else {
157+ opts = append (opts , grpc .WithTransportCredentials (insecure .NewCredentials ()))
158+ }
159+
160+ return grpc .NewClient (c .config .Address , opts ... )
161+ }
162+
163+ // buildTLSCredentials creates gRPC transport credentials based on the TLS configuration.
164+ // Returns nil if TLS is not enabled.
165+ func buildTLSCredentials (cfg * Config ) (credentials.TransportCredentials , error ) {
166+ if ! cfg .TLS .Enabled {
167+ return nil , nil
168+ }
169+
170+ tlsConfig := & tls.Config {
171+ MinVersion : tls .VersionTLS12 ,
172+ ServerName : cfg .TLS .ServerName ,
173+ InsecureSkipVerify : cfg .TLS .InsecureSkipVerify , //nolint:gosec // Intentionally configurable for development/testing
174+ }
175+
176+ // Load CA certificate if specified
177+ if cfg .TLS .CAFile != "" {
178+ caCert , err := os .ReadFile (cfg .TLS .CAFile )
179+ if err != nil {
180+ return nil , fmt .Errorf ("failed to read CA file: %w" , err )
181+ }
182+
183+ caPool := x509 .NewCertPool ()
184+ if ! caPool .AppendCertsFromPEM (caCert ) {
185+ return nil , fmt .Errorf ("failed to parse CA certificate" )
186+ }
187+ tlsConfig .RootCAs = caPool
188+ }
189+
190+ // Load client certificate and key for mTLS
191+ if cfg .TLS .CertFile != "" && cfg .TLS .KeyFile != "" {
192+ cert , err := tls .LoadX509KeyPair (cfg .TLS .CertFile , cfg .TLS .KeyFile )
193+ if err != nil {
194+ return nil , fmt .Errorf ("failed to load client certificate: %w" , err )
195+ }
196+ tlsConfig .Certificates = []tls.Certificate {cert }
197+ }
198+
199+ return credentials .NewTLS (tlsConfig ), nil
200+ }
201+
202+ // parseClassifyType converts a string classify type to the proto enum value.
203+ // Returns FIRST_CELL as the default for unrecognized values.
204+ func parseClassifyType (classifyType string ) pb.ClassifyType {
205+ switch classifyType {
206+ case "first_cell" :
207+ return pb .ClassifyType_FIRST_CELL
208+ case "session_prefix" :
209+ return pb .ClassifyType_SESSION_PREFIX
210+ case "cell_id" :
211+ return pb .ClassifyType_CELL_ID
212+ default :
213+ return pb .ClassifyType_FIRST_CELL
214+ }
215+ }
0 commit comments