Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deploy/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
181 changes: 147 additions & 34 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"time"

"github.com/github/deployment-tracker/pkg/deploymentrecord"
"github.com/github/deployment-tracker/pkg/image"
"github.com/github/deployment-tracker/pkg/metrics"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -21,10 +24,18 @@ import (
"k8s.io/client-go/util/workqueue"
)

const (
// EventCreated indicates that a pod has been created.
EventCreated = "CREATED"
// EventDeleted indicates that a pod has been deleted.
EventDeleted = "DELETED"
)

// PodEvent represents a pod event to be processed.
type PodEvent struct {
Key string
EventType string
Key string
EventType string
DeletedPod *corev1.Pod // Only populated for delete events
}

// Controller is the Kubernetes controller for tracking deployments.
Expand All @@ -34,6 +45,10 @@ type Controller struct {
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
apiClient *deploymentrecord.Client
cfg *Config
// best effort cache to avoid redundant posts
// post requests are idempotent, so if this cache fails due to
// restarts or other events, nothing will break.
observedDeployments sync.Map
}

// New creates a new deployment tracker controller.
Expand Down Expand Up @@ -92,8 +107,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
return
}

// Only process pods that are running
if pod.Status.Phase == corev1.PodRunning {
// Only process pods that are running and belong
// to a deployment
if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" {
key, err := cache.MetaNamespaceKeyFunc(obj)

// For our purposes, there are in practice
Expand All @@ -102,7 +118,7 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
if err == nil {
queue.Add(PodEvent{
Key: key,
EventType: "CREATED",
EventType: EventCreated,
})
}
}
Expand All @@ -123,8 +139,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
return
}

// Skip if pod is being deleted
if newPod.DeletionTimestamp != nil {
// Skip if pod is being deleted or doesn't belong
// to a deployment
if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" {
return
}

Expand All @@ -143,38 +160,43 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
if err == nil {
queue.Add(PodEvent{
Key: key,
EventType: "CREATED",
EventType: EventCreated,
})
}
}
},
DeleteFunc: func(obj any) {
_, ok := obj.(*corev1.Pod)
pod, ok := obj.(*corev1.Pod)
if !ok {
// Handle deleted final state unknown
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
_, ok = tombstone.Obj.(*corev1.Pod)
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
return
}
}
key, err := cache.MetaNamespaceKeyFunc(obj)

// Only process pods that belong to a deployment
if getDeploymentName(pod) == "" {
return
}

key, err := cache.MetaNamespaceKeyFunc(obj)
// For our purposes, there are in practice
// no error event we care about, so don't
// bother with handling it.
if err == nil {
queue.Add(PodEvent{
Key: key,
EventType: "DELETED",
Key: key,
EventType: EventDeleted,
DeletedPod: pod,
})
}
},
})

if err != nil {
return nil, fmt.Errorf("failed to add event handlers: %w", err)
}
Expand Down Expand Up @@ -255,30 +277,63 @@ func (c *Controller) processNextItem(ctx context.Context) bool {

// processEvent processes a single pod event.
func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
// Get the pod from the informer's cache
obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key)
if err != nil {
slog.Error("Failed to get pod from cache",
"key", event.Key,
"error", err,
)
return nil
}
if !exists {
// Pod no longer exists in cache, skip processing
return nil
}
var pod *corev1.Pod

if event.EventType == EventDeleted {
// For delete events, use the pod captured at deletion time
pod = event.DeletedPod
if pod == nil {
slog.Error("Delete event missing pod data",
"key", event.Key,
)
return nil
}

pod, ok := obj.(*corev1.Pod)
if !ok {
slog.Error("Invalid object type in cache",
"key", event.Key,
)
return nil
// Check if the parent deployment still exists
// If it does, this is just a scale-down event, skip it.
//
// If a deployment changes image versions, this will not
// fire delete/decommissioned events to the remote API.
// This is as intended, as the server will keep track of
// the (cluster unique) deployment name, and just update
// the referenced image digest to the newly observed (via
// the create event).
deploymentName := getDeploymentName(pod)
if deploymentName != "" && c.deploymentExists(ctx, pod.Namespace, deploymentName) {
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making an API call to check deployment existence for every pod deletion could be expensive at scale. Consider using a deployment informer with a local cache instead of direct API calls, especially since the controller already uses informers for pods.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a good idea. I will do this as a follow up PR.

slog.Debug("Deployment still exists, skipping pod delete (scale down)",
"namespace", pod.Namespace,
"deployment", deploymentName,
"pod", pod.Name,
)
return nil
}
} else {
// For create events, get the pod from the informer's cache
obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key)
if err != nil {
slog.Error("Failed to get pod from cache",
"key", event.Key,
"error", err,
)
return nil
}
if !exists {
// Pod no longer exists in cache, skip processing
return nil
}

var ok bool
pod, ok = obj.(*corev1.Pod)
if !ok {
slog.Error("Invalid object type in cache",
"key", event.Key,
)
return nil
}
}

status := deploymentrecord.StatusDeployed
if event.EventType == "DELETED" {
if event.EventType == EventDeleted {
status = deploymentrecord.StatusDecommissioned
}

Expand All @@ -301,6 +356,25 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
return lastErr
}

// deploymentExists checks if a deployment exists in the cluster.
func (c *Controller) deploymentExists(ctx context.Context, namespace, name string) bool {
_, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return false
}
// On error, assume it exists to be safe
// (avoid false decommissions)
slog.Warn("Failed to check if deployment exists, assuming it does",
"namespace", namespace,
"deployment", name,
"error", err,
)
return true
}
return true
}

// recordContainer records a single container's deployment info.
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string) error {
dn := getARDeploymentName(pod, container, c.cfg.Template)
Expand All @@ -317,6 +391,31 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
return nil
}

cacheKey := getCacheKey(dn, digest)

// Check if we've already recorded this deployment
switch status {
case deploymentrecord.StatusDeployed:
if _, exists := c.observedDeployments.Load(cacheKey); exists {
slog.Debug("Deployment already observed, skipping post",
"deployment_name", dn,
"digest", digest,
)
return nil
}
case deploymentrecord.StatusDecommissioned:
// For delete, check if we've seen it - if not, no need to decommission
if _, exists := c.observedDeployments.Load(cacheKey); !exists {
slog.Debug("Deployment not in cache, skipping decommission",
"deployment_name", dn,
"digest", digest,
)
return nil
}
default:
return fmt.Errorf("invalid status: %s", status)
}

// Extract image name and tag
imageName, version := image.ExtractName(container.Image)

Expand Down Expand Up @@ -366,9 +465,23 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
"digest", record.Digest,
)

// Update cache after successful post
switch status {
case deploymentrecord.StatusDeployed:
c.observedDeployments.Store(cacheKey, true)
case deploymentrecord.StatusDecommissioned:
c.observedDeployments.Delete(cacheKey)
default:
return fmt.Errorf("invalid status: %s", status)
}

return nil
}

func getCacheKey(dn, digest string) string {
return dn + "||" + digest
}

// getARDeploymentName converts the pod's metadata into the correct format
// for the deployment name for the artifact registry (this is not the same
// as the K8s deployment's name!
Expand Down