diff --git a/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java b/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java index 9da64889fc36..2db91ab0e35d 100644 --- a/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java +++ b/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import com.cloud.network.Network; import com.cloud.usage.dao.UsageNetworksDao; @@ -192,6 +193,7 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna private final List usageVmDisks = new ArrayList(); private final ScheduledExecutorService _executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Job")); + private final AtomicBoolean isParsingJobRunning = new AtomicBoolean(false); private final ScheduledExecutorService _heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-HB")); private final ScheduledExecutorService _sanityExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Sanity")); private Future _scheduledFuture = null; @@ -367,7 +369,12 @@ public void run() { (new ManagedContextRunnable() { @Override protected void runInContext() { - runInContextInternal(); + isParsingJobRunning.set(true); + try { + runInContextInternal(); + } finally { + isParsingJobRunning.set(false); + } } }).run(); } @@ -2267,9 +2274,14 @@ protected void runInContext() { if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) { if (timeToJob > (aggregationDurationMillis / 2)) { - logger.debug("it's been {} ms since last usage job and {} ms until next job, scheduling an immediate job to catch up (aggregation duration is {} minutes)" - , timeSinceLastSuccessJob, timeToJob, _aggregationDuration); - scheduleParse(); + logger.debug("Heartbeat: it's been {} ms since last finished usage job and {} ms until next job (aggregation duration is {} minutes)", + timeSinceLastSuccessJob, timeToJob, _aggregationDuration); + if (isParsingJobRunning.get()) { + logger.debug("Heartbeat: A parsing job is already running"); + } else { + logger.debug("Heartbeat: Scheduling an immediate job to catch up"); + scheduleParse(); + } } }