diff --git a/deploy/manifest.yaml b/deploy/manifest.yaml index 8bab059..cd86ce9 100644 --- a/deploy/manifest.yaml +++ b/deploy/manifest.yaml @@ -17,6 +17,9 @@ rules: - apiGroups: [""] resources: ["pods"] verbs: ["get", "list", "watch"] + - apiGroups: ["apps"] + resources: ["deployments"] + verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/internal/controller/controller.go b/internal/controller/controller.go index e73dbe5..79d956e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "strings" + "sync" "time" "github.com/github/deployment-tracker/pkg/deploymentrecord" @@ -13,6 +14,8 @@ import ( "github.com/github/deployment-tracker/pkg/metrics" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -21,10 +24,18 @@ import ( "k8s.io/client-go/util/workqueue" ) +const ( + // EventCreated indicates that a pod has been created. + EventCreated = "CREATED" + // EventDeleted indicates that a pod has been deleted. + EventDeleted = "DELETED" +) + // PodEvent represents a pod event to be processed. type PodEvent struct { - Key string - EventType string + Key string + EventType string + DeletedPod *corev1.Pod // Only populated for delete events } // Controller is the Kubernetes controller for tracking deployments. @@ -34,6 +45,10 @@ type Controller struct { workqueue workqueue.TypedRateLimitingInterface[PodEvent] apiClient *deploymentrecord.Client cfg *Config + // best effort cache to avoid redundant posts + // post requests are idempotent, so if this cache fails due to + // restarts or other events, nothing will break. + observedDeployments sync.Map } // New creates a new deployment tracker controller. @@ -92,8 +107,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro return } - // Only process pods that are running - if pod.Status.Phase == corev1.PodRunning { + // Only process pods that are running and belong + // to a deployment + if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" { key, err := cache.MetaNamespaceKeyFunc(obj) // For our purposes, there are in practice @@ -102,7 +118,7 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro if err == nil { queue.Add(PodEvent{ Key: key, - EventType: "CREATED", + EventType: EventCreated, }) } } @@ -123,8 +139,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro return } - // Skip if pod is being deleted - if newPod.DeletionTimestamp != nil { + // Skip if pod is being deleted or doesn't belong + // to a deployment + if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" { return } @@ -143,38 +160,43 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro if err == nil { queue.Add(PodEvent{ Key: key, - EventType: "CREATED", + EventType: EventCreated, }) } } }, DeleteFunc: func(obj any) { - _, ok := obj.(*corev1.Pod) + pod, ok := obj.(*corev1.Pod) if !ok { // Handle deleted final state unknown tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return } - _, ok = tombstone.Obj.(*corev1.Pod) + pod, ok = tombstone.Obj.(*corev1.Pod) if !ok { return } } - key, err := cache.MetaNamespaceKeyFunc(obj) + // Only process pods that belong to a deployment + if getDeploymentName(pod) == "" { + return + } + + key, err := cache.MetaNamespaceKeyFunc(obj) // For our purposes, there are in practice // no error event we care about, so don't // bother with handling it. if err == nil { queue.Add(PodEvent{ - Key: key, - EventType: "DELETED", + Key: key, + EventType: EventDeleted, + DeletedPod: pod, }) } }, }) - if err != nil { return nil, fmt.Errorf("failed to add event handlers: %w", err) } @@ -255,30 +277,63 @@ func (c *Controller) processNextItem(ctx context.Context) bool { // processEvent processes a single pod event. func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { - // Get the pod from the informer's cache - obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) - if err != nil { - slog.Error("Failed to get pod from cache", - "key", event.Key, - "error", err, - ) - return nil - } - if !exists { - // Pod no longer exists in cache, skip processing - return nil - } + var pod *corev1.Pod + + if event.EventType == EventDeleted { + // For delete events, use the pod captured at deletion time + pod = event.DeletedPod + if pod == nil { + slog.Error("Delete event missing pod data", + "key", event.Key, + ) + return nil + } - pod, ok := obj.(*corev1.Pod) - if !ok { - slog.Error("Invalid object type in cache", - "key", event.Key, - ) - return nil + // Check if the parent deployment still exists + // If it does, this is just a scale-down event, skip it. + // + // If a deployment changes image versions, this will not + // fire delete/decommissioned events to the remote API. + // This is as intended, as the server will keep track of + // the (cluster unique) deployment name, and just update + // the referenced image digest to the newly observed (via + // the create event). + deploymentName := getDeploymentName(pod) + if deploymentName != "" && c.deploymentExists(ctx, pod.Namespace, deploymentName) { + slog.Debug("Deployment still exists, skipping pod delete (scale down)", + "namespace", pod.Namespace, + "deployment", deploymentName, + "pod", pod.Name, + ) + return nil + } + } else { + // For create events, get the pod from the informer's cache + obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) + if err != nil { + slog.Error("Failed to get pod from cache", + "key", event.Key, + "error", err, + ) + return nil + } + if !exists { + // Pod no longer exists in cache, skip processing + return nil + } + + var ok bool + pod, ok = obj.(*corev1.Pod) + if !ok { + slog.Error("Invalid object type in cache", + "key", event.Key, + ) + return nil + } } status := deploymentrecord.StatusDeployed - if event.EventType == "DELETED" { + if event.EventType == EventDeleted { status = deploymentrecord.StatusDecommissioned } @@ -301,6 +356,25 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return lastErr } +// deploymentExists checks if a deployment exists in the cluster. +func (c *Controller) deploymentExists(ctx context.Context, namespace, name string) bool { + _, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + // On error, assume it exists to be safe + // (avoid false decommissions) + slog.Warn("Failed to check if deployment exists, assuming it does", + "namespace", namespace, + "deployment", name, + "error", err, + ) + return true + } + return true +} + // recordContainer records a single container's deployment info. func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string) error { dn := getARDeploymentName(pod, container, c.cfg.Template) @@ -317,6 +391,31 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta return nil } + cacheKey := getCacheKey(dn, digest) + + // Check if we've already recorded this deployment + switch status { + case deploymentrecord.StatusDeployed: + if _, exists := c.observedDeployments.Load(cacheKey); exists { + slog.Debug("Deployment already observed, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + case deploymentrecord.StatusDecommissioned: + // For delete, check if we've seen it - if not, no need to decommission + if _, exists := c.observedDeployments.Load(cacheKey); !exists { + slog.Debug("Deployment not in cache, skipping decommission", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + default: + return fmt.Errorf("invalid status: %s", status) + } + // Extract image name and tag imageName, version := image.ExtractName(container.Image) @@ -366,9 +465,23 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta "digest", record.Digest, ) + // Update cache after successful post + switch status { + case deploymentrecord.StatusDeployed: + c.observedDeployments.Store(cacheKey, true) + case deploymentrecord.StatusDecommissioned: + c.observedDeployments.Delete(cacheKey) + default: + return fmt.Errorf("invalid status: %s", status) + } + return nil } +func getCacheKey(dn, digest string) string { + return dn + "||" + digest +} + // getARDeploymentName converts the pod's metadata into the correct format // for the deployment name for the artifact registry (this is not the same // as the K8s deployment's name!