Skip to content

Commit e03830b

Browse files
Vasilii IakliushinJohn Cai
authored andcommitted
Merge branch 'jc/add-gitaly-retry-policy' into 'main'
Add Gitaly retry policy Closes #831 See merge request https://gitlab.com/gitlab-org/gitlab-shell/-/merge_requests/1349 Merged-by: Vasilii Iakliushin <viakliushin@gitlab.com> Approved-by: Fred Reinink <freinink@gitlab.com> Approved-by: Vasilii Iakliushin <viakliushin@gitlab.com> Reviewed-by: Vasilii Iakliushin <viakliushin@gitlab.com> Reviewed-by: Fred Reinink <freinink@gitlab.com> Co-authored-by: John Cai <jcai@gitlab.com>
2 parents d34a1d1 + e1ffe4c commit e03830b

7 files changed

Lines changed: 165 additions & 44 deletions

File tree

internal/command/uploadpack/gitalycall_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,52 @@ func TestUploadPack(t *testing.T) {
8080
})
8181
}
8282
}
83+
84+
func TestUploadPackWithRetryConfig(t *testing.T) {
85+
gitalyAddress, testServer := testserver.StartGitalyServer(t, "tcp")
86+
87+
retryConfig := map[string]interface{}{
88+
"maxAttempts": 4,
89+
"initialBackoff": "0.1s",
90+
"maxBackoff": "1s",
91+
"backoffMultiplier": 2,
92+
"retryableStatusCodes": []string{"UNAVAILABLE"},
93+
}
94+
requests := requesthandlers.BuildAllowedWithGitalyHandlersAndRetryConfig(t, gitalyAddress, retryConfig)
95+
url := testserver.StartHTTPServer(t, requests)
96+
97+
output := &bytes.Buffer{}
98+
input := &bytes.Buffer{}
99+
100+
repo := "group/repo"
101+
env := sshenv.Env{
102+
IsSSHConnection: true,
103+
OriginalCommand: "git-upload-pack " + repo,
104+
RemoteAddr: "127.0.0.1",
105+
}
106+
107+
args := &commandargs.Shell{
108+
GitlabKeyID: "1",
109+
CommandType: commandargs.UploadPack,
110+
SSHArgs: []string{"git-upload-pack", repo},
111+
Env: env,
112+
}
113+
114+
ctx := correlation.ContextWithCorrelation(context.Background(), "retry-test")
115+
ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests")
116+
117+
cfg := &config.Config{GitlabURL: url}
118+
cfg.GitalyClient.InitSidechannelRegistry(ctx)
119+
120+
cmd := &Command{
121+
Config: cfg,
122+
Args: args,
123+
ReadWriter: &readwriter.ReadWriter{ErrOut: output, Out: output, In: input},
124+
}
125+
126+
_, err := cmd.Execute(ctx)
127+
require.NoError(t, err)
128+
129+
require.Equal(t, "SSHUploadPackWithSidechannel: "+repo, output.String())
130+
require.Equal(t, "retry-test", testServer.ReceivedMD["x-gitlab-correlation-id"][0])
131+
}

internal/gitaly/gitaly.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,24 @@ import (
1919
"gitlab.com/gitlab-org/gitlab-shell/v14/internal/metrics"
2020
)
2121

22-
// Command represents a gRPC service command with its address and token.
23-
type Command struct {
22+
// CacheKey contains the fields used for connection caching.
23+
type CacheKey struct {
2424
ServiceName string
2525
Address string
2626
Token string
2727
}
2828

29+
// Command represents a gRPC service command with its address and token.
30+
type Command struct {
31+
CacheKey
32+
33+
RetryPolicy *gitalyclient.RetryPolicy
34+
}
35+
2936
type connectionsCache struct {
3037
sync.RWMutex
3138

32-
connections map[Command]*grpc.ClientConn
39+
connections map[CacheKey]*grpc.ClientConn
3340
}
3441

3542
// Client manages connections to Gitaly services and handles sidechannel communication.
@@ -46,8 +53,10 @@ func (c *Client) InitSidechannelRegistry(ctx context.Context) {
4653

4754
// GetConnection returns a gRPC connection for the given command, using a cached connection if available.
4855
func (c *Client) GetConnection(ctx context.Context, cmd Command) (*grpc.ClientConn, error) {
56+
key := CacheKey{ServiceName: cmd.ServiceName, Address: cmd.Address, Token: cmd.Token}
57+
4958
c.cache.RLock()
50-
existingConn := c.cache.connections[cmd]
59+
existingConn := c.cache.connections[key]
5160
c.cache.RUnlock()
5261

5362
if existingConn != nil {
@@ -57,7 +66,7 @@ func (c *Client) GetConnection(ctx context.Context, cmd Command) (*grpc.ClientCo
5766
c.cache.Lock()
5867
defer c.cache.Unlock()
5968

60-
if cachedConn := c.cache.connections[cmd]; cachedConn != nil {
69+
if cachedConn := c.cache.connections[key]; cachedConn != nil {
6170
return cachedConn, nil
6271
}
6372

@@ -67,10 +76,10 @@ func (c *Client) GetConnection(ctx context.Context, cmd Command) (*grpc.ClientCo
6776
}
6877

6978
if c.cache.connections == nil {
70-
c.cache.connections = make(map[Command]*grpc.ClientConn)
79+
c.cache.connections = make(map[CacheKey]*grpc.ClientConn)
7180
}
7281

73-
c.cache.connections[cmd] = newConn
82+
c.cache.connections[key] = newConn
7483

7584
return newConn, nil
7685
}
@@ -134,5 +143,9 @@ func (c *Client) newConnection(ctx context.Context, cmd Command) (conn *grpc.Cli
134143
gitalyclient.WithGrpcOptions(grpcOpts),
135144
}
136145

146+
if cmd.RetryPolicy != nil {
147+
connOpts = append(connOpts, gitalyclient.WithRetryPolicy(cmd.RetryPolicy))
148+
}
149+
137150
return gitalyclient.DialSidechannel(ctx, cmd.Address, c.SidechannelRegistry, connOpts...)
138151
}

internal/gitaly/gitaly_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ func TestPrometheusMetrics(t *testing.T) {
1515

1616
c := newClient()
1717

18-
cmd := Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"}
18+
cmd := Command{CacheKey: CacheKey{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"}}
1919
c.newConnection(context.Background(), cmd)
2020
c.newConnection(context.Background(), cmd)
2121

2222
require.Equal(t, 1, testutil.CollectAndCount(metrics.GitalyConnectionsTotal))
2323
require.InDelta(t, 2, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("ok")), 0.1)
2424
require.InDelta(t, 0, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("fail")), 0.1)
2525

26-
cmd = Command{Address: ""}
26+
cmd = Command{CacheKey: CacheKey{Address: ""}}
2727
c.newConnection(context.Background(), cmd)
2828

2929
require.InDelta(t, 2, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("ok")), 0.1)
@@ -35,7 +35,7 @@ func TestCachedConnections(t *testing.T) {
3535

3636
require.Empty(t, c.cache.connections)
3737

38-
cmd := Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"}
38+
cmd := Command{CacheKey: CacheKey{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"}}
3939

4040
conn, err := c.GetConnection(context.Background(), cmd)
4141
require.NoError(t, err)
@@ -46,7 +46,7 @@ func TestCachedConnections(t *testing.T) {
4646
require.Len(t, c.cache.connections, 1)
4747
require.Equal(t, conn, newConn)
4848

49-
cmd = Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9998"}
49+
cmd = Command{CacheKey: CacheKey{ServiceName: "git-upload-pack", Address: "tcp://localhost:9998"}}
5050
_, err = c.GetConnection(context.Background(), cmd)
5151
require.NoError(t, err)
5252
require.Len(t, c.cache.connections, 2)

internal/gitlabnet/accessverifier/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package accessverifier
33

44
import (
55
"context"
6+
"encoding/json"
67
"fmt"
78
"net/http"
89

@@ -82,7 +83,8 @@ type Response struct {
8283
Who string
8384
StatusCode int
8485
// NeedAudit indicates whether git event should be audited to rails.
85-
NeedAudit bool `json:"need_audit"`
86+
NeedAudit bool `json:"need_audit"`
87+
RetryConfig json.RawMessage `json:"retry_config,omitempty"`
8688
}
8789

8890
// NewClient creates a new instance of Client

internal/handler/exec.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package handler
33

44
import (
55
"context"
6+
"encoding/json"
67
"fmt"
78
"strconv"
89
"strings"
@@ -11,13 +12,14 @@ import (
1112
grpccodes "google.golang.org/grpc/codes"
1213
"google.golang.org/grpc/metadata"
1314
grpcstatus "google.golang.org/grpc/status"
15+
"google.golang.org/protobuf/encoding/protojson"
1416

17+
gitalyclient "gitlab.com/gitlab-org/gitaly/v18/client"
18+
pb "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb"
1519
"gitlab.com/gitlab-org/gitlab-shell/v14/internal/config"
1620
"gitlab.com/gitlab-org/gitlab-shell/v14/internal/gitaly"
1721
"gitlab.com/gitlab-org/gitlab-shell/v14/internal/gitlabnet/accessverifier"
1822
"gitlab.com/gitlab-org/gitlab-shell/v14/internal/sshenv"
19-
20-
pb "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb"
2123
"gitlab.com/gitlab-org/labkit/log"
2224
)
2325

@@ -36,14 +38,31 @@ type GitalyCommand struct {
3638
// NewGitalyCommand creates a new GitalyCommand instance
3739
func NewGitalyCommand(cfg *config.Config, serviceName string, response *accessverifier.Response) *GitalyCommand {
3840
gc := gitaly.Command{
39-
ServiceName: serviceName,
40-
Address: response.Gitaly.Address,
41-
Token: response.Gitaly.Token,
41+
CacheKey: gitaly.CacheKey{
42+
ServiceName: serviceName,
43+
Address: response.Gitaly.Address,
44+
Token: response.Gitaly.Token,
45+
},
46+
RetryPolicy: parseRetryConfig(response.RetryConfig),
4247
}
4348

4449
return &GitalyCommand{Config: cfg, Response: response, Command: gc}
4550
}
4651

52+
func parseRetryConfig(rawConfig json.RawMessage) *gitalyclient.RetryPolicy {
53+
if len(rawConfig) == 0 {
54+
return nil
55+
}
56+
57+
var policy gitalyclient.RetryPolicy
58+
if err := protojson.Unmarshal(rawConfig, &policy); err != nil {
59+
log.WithError(err).Error("failed to unmarshal retry policy")
60+
return nil
61+
}
62+
63+
return &policy
64+
}
65+
4766
// processGitalyError handles errors that come back from Gitaly that may be a
4867
// LimitError. A LimitError is returned by Gitaly when it is at its limit in
4968
// handling requests. Since this is a known error, we should print a sensible

internal/handler/exec_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package handler
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"testing"
78

@@ -242,3 +243,33 @@ func errWithDetail(t *testing.T, detail proto.Message) error {
242243

243244
return grpcstatus.ErrorProto(proto)
244245
}
246+
247+
func TestNewGitalyCommandWithRetryConfig(t *testing.T) {
248+
retryConfig := json.RawMessage(`{"maxAttempts":4,"initialBackoff":"0.1s","maxBackoff":"1s","backoffMultiplier":2,"retryableStatusCodes":["UNAVAILABLE"]}`)
249+
250+
cmd := NewGitalyCommand(
251+
newConfig(),
252+
string(commandargs.UploadPack),
253+
&accessverifier.Response{
254+
Gitaly: accessverifier.Gitaly{Address: "tcp://localhost:9999"},
255+
RetryConfig: retryConfig,
256+
},
257+
)
258+
259+
require.NotNil(t, cmd.Command.RetryPolicy)
260+
require.Equal(t, uint32(4), cmd.Command.RetryPolicy.MaxAttempts)
261+
require.InEpsilon(t, float32(2), cmd.Command.RetryPolicy.BackoffMultiplier, 0.0001)
262+
require.Equal(t, []string{"UNAVAILABLE"}, cmd.Command.RetryPolicy.RetryableStatusCodes)
263+
}
264+
265+
func TestNewGitalyCommandWithoutRetryConfig(t *testing.T) {
266+
cmd := NewGitalyCommand(
267+
newConfig(),
268+
string(commandargs.UploadPack),
269+
&accessverifier.Response{
270+
Gitaly: accessverifier.Gitaly{Address: "tcp://localhost:9999"},
271+
},
272+
)
273+
274+
require.Nil(t, cmd.Command.RetryPolicy)
275+
}

internal/testhelper/requesthandlers/requesthandlers.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,40 +33,47 @@ func BuildDisallowedByAPIHandlers(t *testing.T) []testserver.TestRequestHandler
3333

3434
// BuildAllowedWithGitalyHandlers returns test request handlers for allowed API calls with Gitaly.
3535
func BuildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string) []testserver.TestRequestHandler {
36-
requests := []testserver.TestRequestHandler{
36+
return BuildAllowedWithGitalyHandlersAndRetryConfig(t, gitalyAddress, nil)
37+
}
38+
39+
// BuildAllowedWithGitalyHandlersAndRetryConfig returns test request handlers for allowed API calls with Gitaly and retry config.
40+
func BuildAllowedWithGitalyHandlersAndRetryConfig(t *testing.T, gitalyAddress string, retryConfig map[string]interface{}) []testserver.TestRequestHandler {
41+
body := map[string]interface{}{
42+
"status": true,
43+
"gl_id": "1",
44+
"gl_key_type": "key",
45+
"gl_key_id": 123,
46+
"gl_username": "alex-doe",
47+
"gitaly": map[string]interface{}{
48+
"repository": map[string]interface{}{
49+
"storage_name": "storage_name",
50+
"relative_path": "relative_path",
51+
"git_object_directory": "path/to/git_object_directory",
52+
"git_alternate_object_directories": []string{"path/to/git_alternate_object_directory"},
53+
"gl_repository": "group/repo",
54+
"gl_project_path": "group/project-path",
55+
},
56+
"address": gitalyAddress,
57+
"token": "token",
58+
"features": map[string]string{
59+
"gitaly-feature-cache_invalidator": "true",
60+
"gitaly-feature-inforef_uploadpack_cache": "false",
61+
},
62+
},
63+
}
64+
65+
if retryConfig != nil {
66+
body["retry_config"] = retryConfig
67+
}
68+
69+
return []testserver.TestRequestHandler{
3770
{
3871
Path: "/api/v4/internal/allowed",
3972
Handler: func(w http.ResponseWriter, _ *http.Request) {
40-
body := map[string]interface{}{
41-
"status": true,
42-
"gl_id": "1",
43-
"gl_key_type": "key",
44-
"gl_key_id": 123,
45-
"gl_username": "alex-doe",
46-
"gitaly": map[string]interface{}{
47-
"repository": map[string]interface{}{
48-
"storage_name": "storage_name",
49-
"relative_path": "relative_path",
50-
"git_object_directory": "path/to/git_object_directory",
51-
"git_alternate_object_directories": []string{"path/to/git_alternate_object_directory"},
52-
"gl_repository": "group/repo",
53-
"gl_project_path": "group/project-path",
54-
},
55-
"address": gitalyAddress,
56-
"token": "token",
57-
"features": map[string]string{
58-
"gitaly-feature-cache_invalidator": "true",
59-
"gitaly-feature-inforef_uploadpack_cache": "false",
60-
"some-other-ff": "true",
61-
},
62-
},
63-
}
6473
assert.NoError(t, json.NewEncoder(w).Encode(body))
6574
},
6675
},
6776
}
68-
69-
return requests
7077
}
7178

7279
// BuildAllowedWithCustomActionsHandlers returns test request handlers for allowed API calls with custom actions.

0 commit comments

Comments
 (0)