import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TaskQueueAnomalyDetector {
private final Queue<String> taskQueue = new LinkedList<>(); //Simulate a task queue
private final BlockingQueue<String> realTaskQueue = new LinkedBlockingQueue<>(); //Actual task queue
private final int anomalyThreshold = 5; // Define the anomaly threshold
private final AtomicInteger anomalyCount = new AtomicInteger(0); // Track anomaly count
private final List<String> anomalyLog = new ArrayList<>(); // Log anomalies
public TaskQueueAnomalyDetector() {
// Initialize the real task queue
realTaskQueue.add("Task1");
realTaskQueue.add("Task2");
realTaskQueue.add("Task3");
realTaskQueue.add("Task4");
realTaskQueue.add("Task5");
realTaskQueue.add("Task6");
realTaskQueue.add("Task7");
}
public void enqueueTask(String task) {
taskQueue.offer(task);
realTaskQueue.add(task);
}
public List<String> getTaskQueue(){
return taskQueue;
}
public void checkQueueForAnomalies() {
try {
// Simulate processing tasks
for (int i = 0; i < 10; i++) {
String task = realTaskQueue.poll();
if (task != null) {
processTask(task);
}
}
} catch (InterruptedException e) {
System.err.println("Queue processing interrupted: " + e.getMessage());
//Handle the interruption gracefully
}
//Anomaly detection logic
if (taskQueue.size() > anomalyThreshold) {
anomalyCount.incrementAndGet();
anomalyLog.add("High queue size: " + taskQueue.size());
System.err.println("Anomaly detected: High queue size - " + taskQueue.size());
} else if (taskQueue.isEmpty() && !realTaskQueue.isEmpty()){
anomalyCount.incrementAndGet();
anomalyLog.add("Empty queue, but real queue is not");
System.err.println("Anomaly detected: Empty task queue but real queue is not empty.");
}
}
private void processTask(String task) throws InterruptedException {
// Simulate task processing with potential failures
if (task.equals("Task5")) { //Simulate a task that may fail
throw new RuntimeException("Simulated task failure for Task5");
}
System.out.println("Processing task: " + task);
Thread.sleep(100); // Simulate processing time
}
public int getAnomalyCount() {
return anomalyCount.get();
}
public List<String> getAnomalyLog() {
return new ArrayList<>(anomalyLog); // Return a copy to prevent external modification
}
public static void main(String[] args) throws InterruptedException {
TaskQueueAnomalyDetector detector = new TaskQueueAnomalyDetector();
// Simulate adding tasks
for (int i = 0; i < 10; i++) {
detector.enqueueTask("Task" + (i + 1));
}
// Check for anomalies periodically
for (int i = 0; i < 20; i++) {
detector.checkQueueForAnomalies();
Thread.sleep(500); // Check every 500ms
}
System.out.println("Total anomalies: " + detector.getAnomalyCount());
System.out.println("Anomaly Log: " + detector.getAnomalyLog());
}
}
Add your comment