Skip to content

Commit 74c64f4

Browse files
authored
fix(gitter): better traffic handling and patch ID caching (#5211)
Makes 2 changes to gitter traffic handling and Patch ID calculation: 1. If context cancels in the middle of loadRepository(), patch ID calculated so far will be saved so we don't need to redo them next time 2. Move semaphore acquisition into singleflight This means affected commit graph walking will not have concurrency limit, but I think (hope?) graph walking is fast enough once a repo is loaded.
1 parent b497bdb commit 74c64f4

3 files changed

Lines changed: 51 additions & 46 deletions

File tree

go/cmd/gitter/gitter.go

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,20 @@ func runCmd(ctx context.Context, dir string, env []string, name string, args ...
204204
return nil
205205
}
206206

207+
// runWithSemaphore runs function after waiting at semaphore for concurrency control
208+
func runWithSemaphore(ctx context.Context, f func() (any, error)) (any, error) {
209+
select {
210+
case semaphore <- struct{}{}:
211+
defer func() { <-semaphore }()
212+
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))
213+
214+
return f()
215+
case <-ctx.Done():
216+
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
217+
return nil, ctx.Err()
218+
}
219+
}
220+
207221
func isLocalRequest(r *http.Request) bool {
208222
host, _, err := net.SplitHostPort(r.RemoteAddr)
209223
if err != nil {
@@ -305,7 +319,9 @@ func marshalResponse(r *http.Request, m proto.Message) ([]byte, error) {
305319

306320
func doFetch(ctx context.Context, w http.ResponseWriter, repoURL string, forceUpdate bool) error {
307321
_, err, _ := gFetch.Do(repoURL, func() (any, error) {
308-
return nil, FetchRepo(ctx, repoURL, forceUpdate)
322+
return runWithSemaphore(ctx, func() (any, error) {
323+
return nil, FetchRepo(ctx, repoURL, forceUpdate)
324+
})
309325
})
310326
if err != nil {
311327
logger.ErrorContext(ctx, "Error fetching blob", slog.Any("error", err))
@@ -341,11 +357,13 @@ func getFreshRepo(ctx context.Context, w http.ResponseWriter, repoURL string, fo
341357
}
342358

343359
repoAny, err, _ := gLoad.Do(repoPath, func() (any, error) {
344-
repoLock := GetRepoLock(repoURL)
345-
repoLock.RLock()
346-
defer repoLock.RUnlock()
360+
return runWithSemaphore(ctx, func() (any, error) {
361+
repoLock := GetRepoLock(repoURL)
362+
repoLock.RLock()
363+
defer repoLock.RUnlock()
347364

348-
return LoadRepository(ctx, repoPath)
365+
return LoadRepository(ctx, repoPath)
366+
})
349367
})
350368
if err != nil {
351369
logger.ErrorContext(ctx, "Failed to load repository", slog.Any("error", err))
@@ -568,25 +586,16 @@ func gitHandler(w http.ResponseWriter, r *http.Request) {
568586
ctx := context.WithValue(r.Context(), urlKey, repoURL)
569587
logger.InfoContext(ctx, "Received request: /git", slog.Bool("forceUpdate", forceUpdate), slog.String("remoteAddr", r.RemoteAddr))
570588

571-
select {
572-
case semaphore <- struct{}{}:
573-
defer func() { <-semaphore }()
574-
case <-ctx.Done():
575-
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
576-
http.Error(w, "Server context cancelled", http.StatusServiceUnavailable)
577-
578-
return
579-
}
580-
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))
581-
582589
// Fetch repo first
583590
if err := doFetch(ctx, w, repoURL, forceUpdate); err != nil {
584591
return
585592
}
586593

587594
// Archive repo
588595
fileDataAny, err, _ := gArchive.Do(repoURL, func() (any, error) {
589-
return ArchiveRepo(ctx, repoURL)
596+
return runWithSemaphore(ctx, func() (any, error) {
597+
return ArchiveRepo(ctx, repoURL)
598+
})
590599
})
591600
if err != nil {
592601
logger.ErrorContext(ctx, "Error archiving blob", slog.Any("error", err))
@@ -626,17 +635,6 @@ func cacheHandler(w http.ResponseWriter, r *http.Request) {
626635
ctx := context.WithValue(r.Context(), urlKey, repoURL)
627636
logger.InfoContext(ctx, "Received request: /cache")
628637

629-
select {
630-
case semaphore <- struct{}{}:
631-
defer func() { <-semaphore }()
632-
case <-ctx.Done():
633-
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
634-
http.Error(w, "Server context cancelled", http.StatusServiceUnavailable)
635-
636-
return
637-
}
638-
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))
639-
640638
if _, err := getFreshRepo(ctx, w, repoURL, body.GetForceUpdate()); err != nil {
641639
return
642640
}
@@ -687,17 +685,6 @@ func affectedCommitsHandler(w http.ResponseWriter, r *http.Request) {
687685
slog.Bool("considerAllBranches", considerAllBranches),
688686
)
689687

690-
select {
691-
case semaphore <- struct{}{}:
692-
defer func() { <-semaphore }()
693-
case <-ctx.Done():
694-
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
695-
http.Error(w, "Server context cancelled", http.StatusServiceUnavailable)
696-
697-
return
698-
}
699-
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))
700-
701688
repo, err := getFreshRepo(ctx, w, repoURL, body.GetForceUpdate())
702689
if err != nil {
703690
return

go/cmd/gitter/persistence.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,16 @@ func loadLastFetchMap() {
8686
logger.Info("Loaded lastFetch map", slog.Int("entry_count", len(lastFetch)))
8787
}
8888

89-
func saveRepositoryCache(cachePath string, repo *Repository) error {
89+
func saveRepositoryCache(cachePath string, repo *Repository) (int, error) {
9090
logger.Info("Saving repository cache", slog.String("path", cachePath))
9191

9292
cache := &pb.RepositoryCache{}
93+
emptyPatchID := SHA1{}
9394
for _, commit := range repo.commits {
95+
// Only save commits that have a patch ID
96+
if commit.PatchID == emptyPatchID {
97+
continue
98+
}
9499
cache.Commits = append(cache.Commits, &pb.CommitDetail{
95100
Hash: commit.Hash[:],
96101
PatchId: commit.PatchID[:],
@@ -99,10 +104,14 @@ func saveRepositoryCache(cachePath string, repo *Repository) error {
99104

100105
data, err := proto.Marshal(cache)
101106
if err != nil {
102-
return err
107+
return 0, err
108+
}
109+
110+
if err := os.WriteFile(cachePath, data, 0600); err != nil {
111+
return 0, err
103112
}
104113

105-
return os.WriteFile(cachePath, data, 0600)
114+
return len(cache.GetCommits()), nil
106115
}
107116

108117
func loadRepositoryCache(cachePath string) (*pb.RepositoryCache, error) {

go/cmd/gitter/repository.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,26 @@ func LoadRepository(ctx context.Context, repoPath string) (*Repository, error) {
9090
return nil, fmt.Errorf("failed to build commit graph: %w", err)
9191
}
9292

93+
var patchIDErr error
9394
if len(newCommits) > 0 {
94-
if err := repo.calculatePatchIDs(ctx, newCommits); err != nil {
95-
return nil, fmt.Errorf("failed to calculate patch id for commits: %w", err)
96-
}
95+
patchIDErr = repo.calculatePatchIDs(ctx, newCommits)
96+
}
97+
98+
// If error is anything other than context cancel, exit early without saving
99+
if patchIDErr != nil && !errors.Is(ctx.Err(), context.Canceled) {
100+
return nil, fmt.Errorf("failed to calculate patch id for commits: %w", patchIDErr)
97101
}
98102

99103
// Save cache
100-
if err := saveRepositoryCache(cachePath, repo); err != nil {
104+
patchIDCount, err := saveRepositoryCache(cachePath, repo)
105+
if err != nil {
101106
logger.ErrorContext(ctx, "Failed to save repository cache", slog.Any("err", err))
102107
}
103108

109+
if patchIDErr != nil {
110+
return nil, fmt.Errorf("failed to fully calculate patch id for commits: %w, saved %d", patchIDErr, patchIDCount)
111+
}
112+
104113
return repo, nil
105114
}
106115

0 commit comments

Comments
 (0)