CollectorTask.java
package com.capitalone.dashboard.collector;
import com.capitalone.dashboard.model.Collector;
import com.capitalone.dashboard.repository.BaseCollectorRepository;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* Base class for Collector task implementation which provides subclasses with
* the following:
* <p>
* <ol>
* <li>Creates a Collector instance the first time the collector runs.</li>
* <li>Uses TaskScheduler to schedule the job based on the provided cron when the process starts.</li>
* <li>Saves the last execution time on the collector when the collection run finishes.</li>
* <li>Sets the collector online/offline when the collector process starts/stops</li>
* </ol>
*
* @param <T> Class that extends Collector
*/
@Component
public abstract class CollectorTask<T extends Collector> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(CollectorTask.class);
private final TaskScheduler taskScheduler;
private final String collectorName;
@Autowired
protected CollectorTask(TaskScheduler taskScheduler, String collectorName) {
this.taskScheduler = taskScheduler;
this.collectorName = collectorName;
}
@Override
public final void run() {
LOGGER.info("Running Collector: {}", collectorName);
T collector = getCollectorRepository().findByName(collectorName);
if (collector == null) {
// Register new collector
collector = getCollectorRepository().save(getCollector());
} else {
// In case the collector options changed via collectors properties setup.
// We want to keep the existing collectors ID same as it ties to collector items.
T newCollector = getCollector();
newCollector.setId(collector.getId());
newCollector.setEnabled(collector.isEnabled());
newCollector.setCollectorType(collector.getCollectorType());
newCollector.setLastExecuted(collector.getLastExecuted());
newCollector.setName(collector.getName());
collector = getCollectorRepository().save(newCollector);
}
if (collector.isEnabled()) {
// Do collection run
collect(collector);
// Update lastUpdate timestamp in Collector
collector.setLastExecuted(System.currentTimeMillis());
getCollectorRepository().save(collector);
}
}
@PostConstruct
public void onStartup() {
taskScheduler.schedule(this, new CronTrigger(getCron()));
setOnline(true);
}
@PreDestroy
public void onShutdown() {
setOnline(false);
}
public abstract T getCollector();
public abstract BaseCollectorRepository<T> getCollectorRepository();
public abstract String getCron();
public abstract void collect(T collector);
private void setOnline(boolean online) {
T collector = getCollectorRepository().findByName(collectorName);
if (collector != null) {
collector.setOnline(online);
getCollectorRepository().save(collector);
}
}
protected boolean throttleRequests (long startTime, int requestCount,
long waitTime, int requestRateLimit,
long requestRateLimitTimeWindow) {
boolean result = false;
// Record Current Time
long currentTime = System.currentTimeMillis();
// Time Elapsed
long timeElapsed = currentTime - startTime;
if (requestCount >= requestRateLimit) {
result = true;
if (timeElapsed <= requestRateLimitTimeWindow) {
long timeToWait = (timeElapsed < requestRateLimitTimeWindow)? ((requestRateLimitTimeWindow - timeElapsed) + waitTime) : waitTime;
LOGGER.debug("Rates limit exceeded: timeElapsed = " +timeElapsed+ "; Rate Count = "+requestCount+ "; waiting for " + timeToWait + " milliseconds");
sleep (timeToWait);
}
}
return result;
}
protected void sleep (long timeToWait) {
try {
Thread.sleep(timeToWait);
} catch (InterruptedException ie) {
LOGGER.error("Thread Interrupted ", ie);
}
}
protected void log(String marker, long start) {
log(marker, start, null);
}
protected void log(String text, long start, Integer count) {
long end = System.currentTimeMillis();
String elapsed = ((end - start) / 1000) + "s";
String token2 = "";
String token3;
if (count == null) {
token3 = Strings.padStart(" " + elapsed, 35 - text.length(), ' ');
} else {
token2 = Strings.padStart(" " + count.toString(), 25 - text.length(), ' ');
token3 = Strings.padStart(" " + elapsed, 10, ' ');
}
LOGGER.info(text + token2 + token3);
}
protected void log(String message) {
LOGGER.info(message);
}
protected void logBanner(String instanceUrl) {
LOGGER.info("-----------------------------------");
LOGGER.info(instanceUrl);
LOGGER.info("-----------------------------------");
}
}