Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/clickhouse/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (
)

const (
defaultRestoreTimeout = 30 * time.Minute
defaultPollInterval = 10 * time.Second
defaultPollInterval = 10 * time.Second
)

// Check-and-finalize command flags
Expand Down Expand Up @@ -117,7 +116,7 @@ func waitAndFinalize(appCtx *app.Context, chClient clickhouse.Interface, operati
}
}

if err := restore.WaitForAPIRestore(checkStatusFn, defaultPollInterval, defaultRestoreTimeout, appCtx.Logger); err != nil {
if err := restore.WaitForAPIRestore(checkStatusFn, defaultPollInterval, appCtx.Logger); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/elasticsearch/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func waitAndFinalize(appCtx *app.Context, repository, snapshotName string) error
return appCtx.ESClient.GetRestoreStatus(repository, snapshotName)
}

if err := restore.WaitForAPIRestore(checkStatusFn, 0, 0, appCtx.Logger); err != nil {
if err := restore.WaitForAPIRestore(checkStatusFn, 0, appCtx.Logger); err != nil {
return err
}

Expand Down
32 changes: 11 additions & 21 deletions internal/orchestration/restore/apirestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
)

const (
defaultAPIRestoreTimeout = 30 * time.Minute
defaultAPIStatusCheckInterval = 10 * time.Second
)

Expand All @@ -18,39 +17,30 @@ const (
func WaitForAPIRestore(
checkStatusFn func() (string, bool, error),
interval time.Duration,
timeout time.Duration,
log *logger.Logger,
) error {
if interval == 0 {
interval = defaultAPIStatusCheckInterval
}
if timeout == 0 {
timeout = defaultAPIRestoreTimeout
}

timeoutChan := time.After(timeout)
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-timeoutChan:
return fmt.Errorf("timeout waiting for restore to complete")
case <-ticker.C:
statusMsg, isComplete, err := checkStatusFn()
if err != nil {
return fmt.Errorf("failed to check restore status: %w", err)
}
<-ticker.C
statusMsg, isComplete, err := checkStatusFn()
if err != nil {
return fmt.Errorf("failed to check restore status: %w", err)
}

log.Debugf("Restore status: %s (complete: %v)", statusMsg, isComplete)
log.Debugf("Restore status: %s (complete: %v)", statusMsg, isComplete)

if isComplete {
if statusMsg == "SUCCESS" || statusMsg == "PARTIAL" {
log.Debugf("Restore completed successfully")
return nil
}
return fmt.Errorf("restore failed with status: %s", statusMsg)
if isComplete {
if statusMsg == "SUCCESS" || statusMsg == "PARTIAL" {
log.Debugf("Restore completed successfully")
return nil
}
return fmt.Errorf("restore failed with status: %s", statusMsg)
}
}
}
Expand Down
44 changes: 19 additions & 25 deletions internal/orchestration/restore/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,38 @@ import (
)

const (
defaultJobCompletionTimeout = 30 * time.Minute
defaultJobStatusCheckInterval = 10 * time.Second
)

// WaitForJobCompletion waits for a Kubernetes job to complete
func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error {
timeout := time.After(defaultJobCompletionTimeout)
ticker := time.NewTicker(defaultJobStatusCheckInterval)
defer ticker.Stop()

for {
select {
case <-timeout:
return fmt.Errorf("timeout waiting for job to complete")
case <-ticker.C:
job, err := k8sClient.GetJob(namespace, jobName)
if err != nil {
return fmt.Errorf("failed to get job status: %w", err)
}
<-ticker.C
job, err := k8sClient.GetJob(namespace, jobName)
if err != nil {
return fmt.Errorf("failed to get job status: %w", err)
}

if job.Status.Succeeded > 0 {
return nil
}
if job.Status.Succeeded > 0 {
return nil
}

if job.Status.Failed > 0 {
// Get and print logs from failed job
log.Println()
log.Errorf("Job failed. Fetching logs...")
log.Println()
if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil {
log.Warningf("Failed to fetch job logs: %v", err)
}
return fmt.Errorf("job failed")
if job.Status.Failed > 0 {
// Get and print logs from failed job
log.Println()
log.Errorf("Job failed. Fetching logs...")
log.Println()
if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil {
log.Warningf("Failed to fetch job logs: %v", err)
}

log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d",
job.Status.Active, job.Status.Succeeded, job.Status.Failed)
return fmt.Errorf("job failed")
}

log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d",
job.Status.Active, job.Status.Succeeded, job.Status.Failed)
}
}

Expand Down