Skip to content

Commit 0ed6264

Browse files
committed
config: simplify grpc options
The endpoint group service config is genuienly useful for all grpc connections. The difference is in the resolver - for groups it will resolve on its own and loadbalnace between the backends for non-groups we use the passthrough resolver which then will loadbalnace between one backend which should be same behavior as today - in turn we get retries on UNAVAILABLE which is still useful. This also unifies the dial option construction which makes it easier to reason about. I also took the liberty to clean up some related stuff. Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent 519da47 commit 0ed6264

5 files changed

Lines changed: 99 additions & 81 deletions

File tree

cmd/thanos/config.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type grpcClientConfig struct {
7070
cert, key, caCert string
7171
serverName string
7272
compression string
73+
serviceConfig string
7374
}
7475

7576
func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig {
@@ -81,19 +82,34 @@ func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientC
8182
cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName)
8283
compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ")
8384
cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone)
85+
cmd.Flag("grpc-service-config", "gRPC service configuration in JSON format. See https://github.com/grpc/grpc/blob/master/doc/service_config.md").Default("").StringVar(&gc.serviceConfig)
86+
87+
return gc
88+
}
89+
90+
func (gc *grpcClientConfig) registerReceiverFlag(cmd extkingpin.FlagClause) *grpcClientConfig {
91+
cmd.Flag("remote-write.client-tls-secure", "Use TLS when talking to the other receivers.").Default("false").BoolVar(&gc.secure)
92+
cmd.Flag("remote-write.client-tls-skip-verify", "Disable TLS certificate verification when talking to the other receivers i.e self signed, signed by fake CA.").Default("false").BoolVar(&gc.skipVerify)
93+
cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server.").Default("").StringVar(&gc.cert)
94+
cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate.").Default("").StringVar(&gc.key)
95+
cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers.").Default("").StringVar(&gc.caCert)
96+
cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned TLS certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName)
97+
compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ")
98+
cmd.Flag("receive.grpc-compression", "Compression algorithm to use for gRPC requests to other receivers. Must be one of: "+compressionOptions).Default(snappy.Name).EnumVar(&gc.compression, snappy.Name, compressionNone)
99+
cmd.Flag("receive.grpc-service-config", "gRPC service configuration file or content in JSON format. See https://github.com/grpc/grpc/blob/master/doc/service_config.md").PlaceHolder("<content>").Default("").StringVar(&gc.serviceConfig)
84100

85101
return gc
86102
}
87103

88104
func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) {
89-
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName)
90-
if err != nil {
91-
return nil, errors.Wrapf(err, "building gRPC client")
105+
opts := []extgrpc.StoreClientGRPCOption{
106+
extgrpc.WithCompression(gc.compression),
107+
extgrpc.WithServiceConfig(gc.serviceConfig),
92108
}
93-
if gc.compression != compressionNone {
94-
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression)))
109+
if gc.secure {
110+
opts = append(opts, extgrpc.WithTLS(gc.cert, gc.key, gc.caCert, gc.serverName, gc.skipVerify))
95111
}
96-
return dialOpts, nil
112+
return extgrpc.StoreClientGRPCOpts(logger, reg, tracer, opts...)
97113
}
98114

99115
type httpConfig struct {

cmd/thanos/endpointset.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/thanos-io/thanos/pkg/discovery/cache"
2727
"github.com/thanos-io/thanos/pkg/discovery/dns"
2828
"github.com/thanos-io/thanos/pkg/errors"
29-
"github.com/thanos-io/thanos/pkg/extgrpc"
3029
"github.com/thanos-io/thanos/pkg/extkingpin"
3130
"github.com/thanos-io/thanos/pkg/extprom"
3231
"github.com/thanos-io/thanos/pkg/logutil"
@@ -116,9 +115,6 @@ func validateEndpointConfig(cfg EndpointConfig) error {
116115
if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict {
117116
return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address)
118117
}
119-
if !ecfg.Group && len(ecfg.ServiceConfig) != 0 {
120-
return errors.Newf("%s service_config is only valid for endpoint groups.", ecfg.Address)
121-
}
122118
}
123119
return nil
124120
}
@@ -354,15 +350,19 @@ func setupEndpointSet(
354350
// groups and non dynamic endpoints
355351
for _, ecfg := range endpointConfig.Endpoints {
356352
strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address
353+
opts := dialOpts
354+
if ecfg.ServiceConfig != "" {
355+
opts = append(append(make([]grpc.DialOption, 0, len(dialOpts)+1), dialOpts...), grpc.WithDefaultServiceConfig(ecfg.ServiceConfig))
356+
}
357357
if group {
358-
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts(ecfg.ServiceConfig)...)...))
358+
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, opts...))
359359
} else if !dns.IsDynamicNode(addr) {
360-
specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...))
360+
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("passthrough:///%s", addr), strict, opts...))
361361
}
362362
}
363363
// dynamic endpoints
364364
for _, addr := range dnsEndpointProvider.Addresses() {
365-
specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...))
365+
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("passthrough:///%s", addr), false, dialOpts...))
366366
}
367367
return removeDuplicateEndpointSpecs(specs)
368368
}, unhealthyTimeout, endpointTimeout, queryTimeout, queryConnMetricLabels...)

cmd/thanos/receive.go

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@ import (
3030
"github.com/thanos-io/objstore"
3131
"github.com/thanos-io/objstore/client"
3232
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
33-
"google.golang.org/grpc"
3433
"gopkg.in/yaml.v2"
3534

3635
"github.com/thanos-io/thanos/pkg/block/metadata"
3736
"github.com/thanos-io/thanos/pkg/component"
3837
"github.com/thanos-io/thanos/pkg/compressutil"
3938
"github.com/thanos-io/thanos/pkg/exemplars"
40-
"github.com/thanos-io/thanos/pkg/extgrpc"
41-
"github.com/thanos-io/thanos/pkg/extgrpc/snappy"
4239
"github.com/thanos-io/thanos/pkg/extkingpin"
4340
"github.com/thanos-io/thanos/pkg/extprom"
4441
"github.com/thanos-io/thanos/pkg/info"
@@ -158,27 +155,10 @@ func runReceive(
158155
return err
159156
}
160157

161-
dialOpts, err := extgrpc.StoreClientGRPCOpts(
162-
logger,
163-
reg,
164-
tracer,
165-
conf.rwClientSecure,
166-
conf.rwClientSkipVerify,
167-
conf.rwClientCert,
168-
conf.rwClientKey,
169-
conf.rwClientServerCA,
170-
conf.rwClientServerName,
171-
)
158+
dialOpts, err := conf.rwClientConfig.dialOptions(logger, reg, tracer)
172159
if err != nil {
173160
return err
174161
}
175-
if conf.compression != compressionNone {
176-
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression)))
177-
}
178-
179-
if conf.grpcServiceConfig != "" {
180-
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(conf.grpcServiceConfig))
181-
}
182162

183163
var bkt objstore.Bucket
184164
confContentYaml, err := conf.objStoreConfig.Content()
@@ -844,12 +824,7 @@ type receiveConfig struct {
844824
rwServerCert string
845825
rwServerKey string
846826
rwServerClientCA string
847-
rwClientCert string
848-
rwClientKey string
849-
rwClientSecure bool
850-
rwClientServerCA string
851-
rwClientServerName string
852-
rwClientSkipVerify bool
827+
rwClientConfig grpcClientConfig
853828
rwServerTlsMinVersion string
854829

855830
dataDir string
@@ -873,9 +848,7 @@ type receiveConfig struct {
873848
forwardTimeout *model.Duration
874849
maxBackoff *model.Duration
875850
maxArtificialDelay *model.Duration
876-
compression string
877851
replicationProtocol string
878-
grpcServiceConfig string
879852

880853
tsdbMinBlockDuration *model.Duration
881854
tsdbMaxBlockDuration *model.Duration
@@ -937,17 +910,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
937910

938911
cmd.Flag("remote-write.server-tls-min-version", "TLS version for the gRPC server, leave blank to default to TLS 1.3, allow values: [\"1.0\", \"1.1\", \"1.2\", \"1.3\"]").Default("1.3").StringVar(&rc.rwServerTlsMinVersion)
939912

940-
cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server.").Default("").StringVar(&rc.rwClientCert)
941-
942-
cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate.").Default("").StringVar(&rc.rwClientKey)
943-
944-
cmd.Flag("remote-write.client-tls-secure", "Use TLS when talking to the other receivers.").Default("false").BoolVar(&rc.rwClientSecure)
945-
946-
cmd.Flag("remote-write.client-tls-skip-verify", "Disable TLS certificate verification when talking to the other receivers i.e self signed, signed by fake CA.").Default("false").BoolVar(&rc.rwClientSkipVerify)
947-
948-
cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers.").Default("").StringVar(&rc.rwClientServerCA)
949-
950-
cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned TLS certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&rc.rwClientServerName)
913+
rc.rwClientConfig.registerReceiverFlag(cmd)
951914

952915
cmd.Flag("tsdb.path", "Data directory of TSDB.").
953916
Default("./data").StringVar(&rc.dataDir)
@@ -987,8 +950,6 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
987950
cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).StringVar(&rc.replicaHeader)
988951

989952
cmd.Flag("receive.forward.async-workers", "Number of concurrent workers processing forwarding of remote-write requests.").Default("5").UintVar(&rc.asyncForwardWorkerCount)
990-
compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ")
991-
cmd.Flag("receive.grpc-compression", "Compression algorithm to use for gRPC requests to other receivers. Must be one of: "+compressionOptions).Default(snappy.Name).EnumVar(&rc.compression, snappy.Name, compressionNone)
992953

993954
cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)
994955

@@ -999,8 +960,6 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
999960

1000961
cmd.Flag("receive.capnproto-address", "Address for the Cap'n Proto server.").Default(fmt.Sprintf("0.0.0.0:%s", receive.DefaultCapNProtoPort)).StringVar(&rc.replicationAddr)
1001962

1002-
cmd.Flag("receive.grpc-service-config", "gRPC service configuration file or content in JSON format. See https://github.com/grpc/grpc/blob/master/doc/service_config.md").PlaceHolder("<content>").Default("").StringVar(&rc.grpcServiceConfig)
1003-
1004963
rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())
1005964

1006965
rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())

cmd/thanos/rule.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -417,17 +417,7 @@ func runRule(
417417
}
418418

419419
if len(grpcEndpoints) > 0 {
420-
dialOpts, err := extgrpc.StoreClientGRPCOpts(
421-
logger,
422-
reg,
423-
tracer,
424-
false,
425-
false,
426-
"",
427-
"",
428-
"",
429-
"",
430-
)
420+
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer)
431421
if err != nil {
432422
return err
433423
}

pkg/extgrpc/client.go

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,33 +78,81 @@ func (c *nonPoolingCodec) Marshal(v any) (mem.BufferSlice, error) {
7878
return mem.BufferSlice{mem.NewBuffer(&bufExact, pool)}, nil
7979
}
8080

81-
// EndpointGroupGRPCOpts creates gRPC dial options for connecting to endpoint groups.
81+
// DefaultServiceConfig is the default gRPC service config applied to all client connections.
82+
// It enables round-robin load balancing and retries on transient UNAVAILABLE errors.
8283
// For details on retry capabilities, see https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
83-
func EndpointGroupGRPCOpts(serviceConfig string) []grpc.DialOption {
84-
if serviceConfig == "" {
85-
serviceConfig = `
86-
{
84+
const DefaultServiceConfig = `{
8785
"loadBalancingPolicy":"round_robin",
8886
"retryPolicy": {
8987
"maxAttempts": 3,
9088
"initialBackoff": "0.1s",
9189
"backoffMultiplier": 2,
9290
"retryableStatusCodes": [
93-
"UNAVAILABLE"
91+
"UNAVAILABLE"
9492
]
9593
}
9694
}`
95+
96+
// StoreClientGRPCOption is a functional option for StoreClientGRPCOpts.
97+
type StoreClientGRPCOption func(*storeClientGRPCOpts)
98+
99+
type storeClientGRPCOpts struct {
100+
serviceConfig string
101+
secure bool
102+
skipVerify bool
103+
cert string
104+
key string
105+
caCert string
106+
serverName string
107+
compression string
108+
}
109+
110+
// WithTLS enables TLS for the gRPC client connection.
111+
// Presence of this option implies secure=true.
112+
// Empty cert/key/caCert/serverName values are valid and result in TLS with system CA pool.
113+
func WithTLS(cert, key, caCert, serverName string, skipVerify bool) StoreClientGRPCOption {
114+
return func(o *storeClientGRPCOpts) {
115+
o.secure = true
116+
o.cert = cert
117+
o.key = key
118+
o.caCert = caCert
119+
o.serverName = serverName
120+
o.skipVerify = skipVerify
97121
}
122+
}
98123

99-
return []grpc.DialOption{
100-
grpc.WithDefaultServiceConfig(serviceConfig),
101-
grpc.WithDisableServiceConfig(),
102-
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 10 * time.Second, Timeout: 5 * time.Second}),
124+
// WithServiceConfig overrides the default gRPC service config. No-op if empty.
125+
func WithServiceConfig(serviceConfig string) StoreClientGRPCOption {
126+
return func(o *storeClientGRPCOpts) {
127+
if serviceConfig != "" {
128+
o.serviceConfig = serviceConfig
129+
}
130+
}
131+
}
132+
133+
// WithCompression enables gRPC compression with the given algorithm. No-op if empty or "none".
134+
func WithCompression(compression string) StoreClientGRPCOption {
135+
return func(o *storeClientGRPCOpts) {
136+
if compression != "" && compression != "none" {
137+
o.compression = compression
138+
}
103139
}
104140
}
105141

106142
// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
107-
func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
143+
// By default, DefaultServiceConfig is applied and connections are insecure.
144+
// Use WithTLS, WithServiceConfig, and WithCompression to customize.
145+
func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, options ...StoreClientGRPCOption) ([]grpc.DialOption, error) {
146+
o := &storeClientGRPCOpts{}
147+
for _, opt := range options {
148+
opt(o)
149+
}
150+
151+
serviceConfig := o.serviceConfig
152+
if serviceConfig == "" {
153+
serviceConfig = DefaultServiceConfig
154+
}
155+
108156
grpcMets := grpc_prometheus.NewClientMetrics(
109157
grpc_prometheus.WithClientHandlingTimeHistogram(grpc_prometheus.WithHistogramOpts(
110158
&prometheus.HistogramOpts{
@@ -131,18 +179,23 @@ func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer op
131179
tracing.StreamClientInterceptor(tracer),
132180
),
133181
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 10 * time.Second, Timeout: 5 * time.Second}),
182+
grpc.WithDefaultServiceConfig(serviceConfig),
183+
grpc.WithDisableServiceConfig(),
184+
}
185+
if o.compression != "" {
186+
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(o.compression)))
134187
}
135188
if reg != nil {
136189
reg.MustRegister(grpcMets)
137190
}
138191

139-
if !secure {
192+
if !o.secure {
140193
return append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())), nil
141194
}
142195

143196
level.Info(logger).Log("msg", "enabling client to server TLS")
144197

145-
tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName, skipVerify)
198+
tlsCfg, err := tls.NewClientConfig(logger, o.cert, o.key, o.caCert, o.serverName, o.skipVerify)
146199
if err != nil {
147200
return nil, err
148201
}

0 commit comments

Comments
 (0)