|
6 | 6 | "fmt" |
7 | 7 | "log/slog" |
8 | 8 | "strings" |
| 9 | + "sync" |
9 | 10 | "time" |
10 | 11 |
|
11 | 12 | "github.com/github/deployment-tracker/pkg/deploymentrecord" |
@@ -37,6 +38,10 @@ type Controller struct { |
37 | 38 | workqueue workqueue.TypedRateLimitingInterface[PodEvent] |
38 | 39 | apiClient *deploymentrecord.Client |
39 | 40 | cfg *Config |
| 41 | + // best effort cache to avoid redundant posts |
| 42 | + // post requests are idempotent, so if this cache fails due to |
| 43 | + // restarts or other events, nothing will break. |
| 44 | + observedDeployments sync.Map |
40 | 45 | } |
41 | 46 |
|
42 | 47 | // New creates a new deployment tracker controller. |
@@ -372,6 +377,31 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta |
372 | 377 | return nil |
373 | 378 | } |
374 | 379 |
|
| 380 | + cacheKey := getCacheKey(dn, digest) |
| 381 | + |
| 382 | + // Check if we've already recorded this deployment |
| 383 | + switch status { |
| 384 | + case deploymentrecord.StatusDeployed: |
| 385 | + if _, exists := c.observedDeployments.Load(cacheKey); exists { |
| 386 | + slog.Debug("Deployment already observed, skipping post", |
| 387 | + "deployment_name", dn, |
| 388 | + "digest", digest, |
| 389 | + ) |
| 390 | + return nil |
| 391 | + } |
| 392 | + case deploymentrecord.StatusDecommissioned: |
| 393 | + // For delete, check if we've seen it - if not, no need to decommission |
| 394 | + if _, exists := c.observedDeployments.Load(cacheKey); !exists { |
| 395 | + slog.Debug("Deployment not in cache, skipping decommission", |
| 396 | + "deployment_name", dn, |
| 397 | + "digest", digest, |
| 398 | + ) |
| 399 | + return nil |
| 400 | + } |
| 401 | + default: |
| 402 | + return fmt.Errorf("invalid status: %s", status) |
| 403 | + } |
| 404 | + |
375 | 405 | // Extract image name and tag |
376 | 406 | imageName, version := image.ExtractName(container.Image) |
377 | 407 |
|
@@ -421,9 +451,23 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta |
421 | 451 | "digest", record.Digest, |
422 | 452 | ) |
423 | 453 |
|
| 454 | + // Update cache after successful post |
| 455 | + switch status { |
| 456 | + case deploymentrecord.StatusDeployed: |
| 457 | + c.observedDeployments.Store(cacheKey, true) |
| 458 | + case deploymentrecord.StatusDecommissioned: |
| 459 | + c.observedDeployments.Delete(cacheKey) |
| 460 | + default: |
| 461 | + return fmt.Errorf("invalid status: %s", status) |
| 462 | + } |
| 463 | + |
424 | 464 | return nil |
425 | 465 | } |
426 | 466 |
|
| 467 | +func getCacheKey(dn, digest string) string { |
| 468 | + return dn + "_" + digest |
| 469 | +} |
| 470 | + |
427 | 471 | // getARDeploymentName converts the pod's metadata into the correct format |
428 | 472 | // for the deployment name for the artifact registry (this is not the same |
429 | 473 | // as the K8s deployment's name! |
|
0 commit comments