From ba747f31fe81ed550b58bbed3526f9b85978d961 Mon Sep 17 00:00:00 2001 From: Fredrik Skogman Date: Thu, 29 Jan 2026 13:51:07 +0100 Subject: [PATCH 1/6] Extact the pod if it's being deleted to avoid a future race condition during processing when the event is pulled of the work queue. --- internal/controller/controller.go | 67 +++++++++++++++++++------------ 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index e73dbe5..889c5c8 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -23,8 +23,9 @@ import ( // PodEvent represents a pod event to be processed. type PodEvent struct { - Key string - EventType string + Key string + EventType string + DeletedPod *corev1.Pod // Only populated for delete events } // Controller is the Kubernetes controller for tracking deployments. @@ -149,14 +150,14 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro } }, DeleteFunc: func(obj any) { - _, ok := obj.(*corev1.Pod) + pod, ok := obj.(*corev1.Pod) if !ok { // Handle deleted final state unknown tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return } - _, ok = tombstone.Obj.(*corev1.Pod) + pod, ok = tombstone.Obj.(*corev1.Pod) if !ok { return } @@ -168,8 +169,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro // bother with handling it. if err == nil { queue.Add(PodEvent{ - Key: key, - EventType: "DELETED", + Key: key, + EventType: "DELETED", + DeletedPod: pod, }) } }, @@ -255,26 +257,41 @@ func (c *Controller) processNextItem(ctx context.Context) bool { // processEvent processes a single pod event. func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { - // Get the pod from the informer's cache - obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) - if err != nil { - slog.Error("Failed to get pod from cache", - "key", event.Key, - "error", err, - ) - return nil - } - if !exists { - // Pod no longer exists in cache, skip processing - return nil - } + var pod *corev1.Pod - pod, ok := obj.(*corev1.Pod) - if !ok { - slog.Error("Invalid object type in cache", - "key", event.Key, - ) - return nil + if event.EventType == "DELETED" { + // For delete events, use the pod captured at deletion time + // since it's already been removed from the cache + pod = event.DeletedPod + if pod == nil { + slog.Error("Delete event missing pod data", + "key", event.Key, + ) + return nil + } + } else { + // For other events, get the pod from the informer's cache + obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) + if err != nil { + slog.Error("Failed to get pod from cache", + "key", event.Key, + "error", err, + ) + return nil + } + if !exists { + // Pod no longer exists in cache, skip processing + return nil + } + + var ok bool + pod, ok = obj.(*corev1.Pod) + if !ok { + slog.Error("Invalid object type in cache", + "key", event.Key, + ) + return nil + } } status := deploymentrecord.StatusDeployed From 68fa274e4e6a4b2dab59efae83123dc605ee3bf9 Mon Sep 17 00:00:00 2001 From: Fredrik Skogman Date: Thu, 29 Jan 2026 14:33:24 +0100 Subject: [PATCH 2/6] When processing events, look back to the owning deployment to understand if the pod's replica count is being changed to avoid excessive requests --- deploy/manifest.yaml | 3 ++ internal/controller/controller.go | 54 ++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/deploy/manifest.yaml b/deploy/manifest.yaml index 8bab059..cd86ce9 100644 --- a/deploy/manifest.yaml +++ b/deploy/manifest.yaml @@ -17,6 +17,9 @@ rules: - apiGroups: [""] resources: ["pods"] verbs: ["get", "list", "watch"] + - apiGroups: ["apps"] + resources: ["deployments"] + verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 889c5c8..59db752 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -13,6 +13,8 @@ import ( "github.com/github/deployment-tracker/pkg/metrics" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -93,8 +95,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro return } - // Only process pods that are running - if pod.Status.Phase == corev1.PodRunning { + // Only process pods that are running and belong + // to a deployment + if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" { key, err := cache.MetaNamespaceKeyFunc(obj) // For our purposes, there are in practice @@ -124,8 +127,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro return } - // Skip if pod is being deleted - if newPod.DeletionTimestamp != nil { + // Skip if pod is being deleted or doesn't belong + // to a deployment + if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" { return } @@ -162,8 +166,13 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro return } } - key, err := cache.MetaNamespaceKeyFunc(obj) + // Only process pods that belong to a deployment + if getDeploymentName(pod) == "" { + return + } + + key, err := cache.MetaNamespaceKeyFunc(obj) // For our purposes, there are in practice // no error event we care about, so don't // bother with handling it. @@ -176,7 +185,6 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro } }, }) - if err != nil { return nil, fmt.Errorf("failed to add event handlers: %w", err) } @@ -261,7 +269,6 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { if event.EventType == "DELETED" { // For delete events, use the pod captured at deletion time - // since it's already been removed from the cache pod = event.DeletedPod if pod == nil { slog.Error("Delete event missing pod data", @@ -269,8 +276,20 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { ) return nil } + + // Check if the parent deployment still exists + // If it does, this is just a scale-down event, skip it + deploymentName := getDeploymentName(pod) + if deploymentName != "" && c.deploymentExists(ctx, pod.Namespace, deploymentName) { + slog.Debug("Deployment still exists, skipping pod delete (scale down)", + "namespace", pod.Namespace, + "deployment", deploymentName, + "pod", pod.Name, + ) + return nil + } } else { - // For other events, get the pod from the informer's cache + // For create events, get the pod from the informer's cache obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) if err != nil { slog.Error("Failed to get pod from cache", @@ -318,6 +337,25 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return lastErr } +// deploymentExists checks if a deployment exists in the cluster. +func (c *Controller) deploymentExists(ctx context.Context, namespace, name string) bool { + _, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + // On error, assume it exists to be safe + // (avoid false decommissions) + slog.Warn("Failed to check if deployment exists, assuming it does", + "namespace", namespace, + "deployment", name, + "error", err, + ) + return true + } + return true +} + // recordContainer records a single container's deployment info. func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string) error { dn := getARDeploymentName(pod, container, c.cfg.Template) From b69a337519a0a640e60ec37e75c75eb84e750a23 Mon Sep 17 00:00:00 2001 From: Fredrik Skogman Date: Thu, 29 Jan 2026 15:05:02 +0100 Subject: [PATCH 3/6] added best effort cache to remove redundant requests to the api --- internal/controller/controller.go | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 59db752..a58b02d 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "strings" + "sync" "time" "github.com/github/deployment-tracker/pkg/deploymentrecord" @@ -37,6 +38,10 @@ type Controller struct { workqueue workqueue.TypedRateLimitingInterface[PodEvent] apiClient *deploymentrecord.Client cfg *Config + // best effort cache to avoid redundant posts + // post requests are idempotent, so if this cache fails due to + // restarts or other events, nothing will break. + observedDeployments sync.Map } // New creates a new deployment tracker controller. @@ -372,6 +377,31 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta return nil } + cacheKey := getCacheKey(dn, digest) + + // Check if we've already recorded this deployment + switch status { + case deploymentrecord.StatusDeployed: + if _, exists := c.observedDeployments.Load(cacheKey); exists { + slog.Debug("Deployment already observed, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + case deploymentrecord.StatusDecommissioned: + // For delete, check if we've seen it - if not, no need to decommission + if _, exists := c.observedDeployments.Load(cacheKey); !exists { + slog.Debug("Deployment not in cache, skipping decommission", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + default: + return fmt.Errorf("invalid status: %s", status) + } + // Extract image name and tag imageName, version := image.ExtractName(container.Image) @@ -421,9 +451,23 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta "digest", record.Digest, ) + // Update cache after successful post + switch status { + case deploymentrecord.StatusDeployed: + c.observedDeployments.Store(cacheKey, true) + case deploymentrecord.StatusDecommissioned: + c.observedDeployments.Delete(cacheKey) + default: + return fmt.Errorf("invalid status: %s", status) + } + return nil } +func getCacheKey(dn, digest string) string { + return dn + "_" + digest +} + // getARDeploymentName converts the pod's metadata into the correct format // for the deployment name for the artifact registry (this is not the same // as the K8s deployment's name! From dd67b97c5d167edc75cf1d80a00cc56aabca20df Mon Sep 17 00:00:00 2001 From: Fredrik Skogman Date: Thu, 29 Jan 2026 16:45:57 +0100 Subject: [PATCH 4/6] use a slightly better separator --- internal/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index a58b02d..491d399 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -465,7 +465,7 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta } func getCacheKey(dn, digest string) string { - return dn + "_" + digest + return dn + "||" + digest } // getARDeploymentName converts the pod's metadata into the correct format From 50ea3163783c5028d7f0020874f45533ffded675 Mon Sep 17 00:00:00 2001 From: Fredrik Skogman Date: Fri, 30 Jan 2026 10:09:00 +0100 Subject: [PATCH 5/6] clarified delete events with update to the deployment spec --- internal/controller/controller.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 491d399..15590c6 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -24,6 +24,11 @@ import ( "k8s.io/client-go/util/workqueue" ) +const ( + EventCreated = "CREATED" + EventDeleted = "DELETED" +) + // PodEvent represents a pod event to be processed. type PodEvent struct { Key string @@ -111,7 +116,7 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro if err == nil { queue.Add(PodEvent{ Key: key, - EventType: "CREATED", + EventType: EventCreated, }) } } @@ -153,7 +158,7 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro if err == nil { queue.Add(PodEvent{ Key: key, - EventType: "CREATED", + EventType: EventCreated, }) } } @@ -184,7 +189,7 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro if err == nil { queue.Add(PodEvent{ Key: key, - EventType: "DELETED", + EventType: EventDeleted, DeletedPod: pod, }) } @@ -272,7 +277,7 @@ func (c *Controller) processNextItem(ctx context.Context) bool { func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { var pod *corev1.Pod - if event.EventType == "DELETED" { + if event.EventType == EventDeleted { // For delete events, use the pod captured at deletion time pod = event.DeletedPod if pod == nil { @@ -283,7 +288,14 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { } // Check if the parent deployment still exists - // If it does, this is just a scale-down event, skip it + // If it does, this is just a scale-down event, skip it. + // + // If a deployment changes image versions, this will not + // fire delete/decommissioned events to the remote API. + // This is as intended, as the server will keep track of + // the (cluster unique) deployment name, and just update + // the referenced image digest to the newly observed (via + // the create event). deploymentName := getDeploymentName(pod) if deploymentName != "" && c.deploymentExists(ctx, pod.Namespace, deploymentName) { slog.Debug("Deployment still exists, skipping pod delete (scale down)", @@ -319,7 +331,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { } status := deploymentrecord.StatusDeployed - if event.EventType == "DELETED" { + if event.EventType == EventDeleted { status = deploymentrecord.StatusDecommissioned } From bce557689d963103ec43e58e3d2a75c7d8e4c502 Mon Sep 17 00:00:00 2001 From: Fredrik Skogman Date: Fri, 30 Jan 2026 10:12:35 +0100 Subject: [PATCH 6/6] added comments --- internal/controller/controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 15590c6..79d956e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -25,7 +25,9 @@ import ( ) const ( + // EventCreated indicates that a pod has been created. EventCreated = "CREATED" + // EventDeleted indicates that a pod has been deleted. EventDeleted = "DELETED" )