1. import java.util.*;
  2. import java.util.concurrent.*;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. public class TaskQueueAnomalyDetector {
  5. private final Queue<String> taskQueue = new LinkedList<>(); //Simulate a task queue
  6. private final BlockingQueue<String> realTaskQueue = new LinkedBlockingQueue<>(); //Actual task queue
  7. private final int anomalyThreshold = 5; // Define the anomaly threshold
  8. private final AtomicInteger anomalyCount = new AtomicInteger(0); // Track anomaly count
  9. private final List<String> anomalyLog = new ArrayList<>(); // Log anomalies
  10. public TaskQueueAnomalyDetector() {
  11. // Initialize the real task queue
  12. realTaskQueue.add("Task1");
  13. realTaskQueue.add("Task2");
  14. realTaskQueue.add("Task3");
  15. realTaskQueue.add("Task4");
  16. realTaskQueue.add("Task5");
  17. realTaskQueue.add("Task6");
  18. realTaskQueue.add("Task7");
  19. }
  20. public void enqueueTask(String task) {
  21. taskQueue.offer(task);
  22. realTaskQueue.add(task);
  23. }
  24. public List<String> getTaskQueue(){
  25. return taskQueue;
  26. }
  27. public void checkQueueForAnomalies() {
  28. try {
  29. // Simulate processing tasks
  30. for (int i = 0; i < 10; i++) {
  31. String task = realTaskQueue.poll();
  32. if (task != null) {
  33. processTask(task);
  34. }
  35. }
  36. } catch (InterruptedException e) {
  37. System.err.println("Queue processing interrupted: " + e.getMessage());
  38. //Handle the interruption gracefully
  39. }
  40. //Anomaly detection logic
  41. if (taskQueue.size() > anomalyThreshold) {
  42. anomalyCount.incrementAndGet();
  43. anomalyLog.add("High queue size: " + taskQueue.size());
  44. System.err.println("Anomaly detected: High queue size - " + taskQueue.size());
  45. } else if (taskQueue.isEmpty() && !realTaskQueue.isEmpty()){
  46. anomalyCount.incrementAndGet();
  47. anomalyLog.add("Empty queue, but real queue is not");
  48. System.err.println("Anomaly detected: Empty task queue but real queue is not empty.");
  49. }
  50. }
  51. private void processTask(String task) throws InterruptedException {
  52. // Simulate task processing with potential failures
  53. if (task.equals("Task5")) { //Simulate a task that may fail
  54. throw new RuntimeException("Simulated task failure for Task5");
  55. }
  56. System.out.println("Processing task: " + task);
  57. Thread.sleep(100); // Simulate processing time
  58. }
  59. public int getAnomalyCount() {
  60. return anomalyCount.get();
  61. }
  62. public List<String> getAnomalyLog() {
  63. return new ArrayList<>(anomalyLog); // Return a copy to prevent external modification
  64. }
  65. public static void main(String[] args) throws InterruptedException {
  66. TaskQueueAnomalyDetector detector = new TaskQueueAnomalyDetector();
  67. // Simulate adding tasks
  68. for (int i = 0; i < 10; i++) {
  69. detector.enqueueTask("Task" + (i + 1));
  70. }
  71. // Check for anomalies periodically
  72. for (int i = 0; i < 20; i++) {
  73. detector.checkQueueForAnomalies();
  74. Thread.sleep(500); // Check every 500ms
  75. }
  76. System.out.println("Total anomalies: " + detector.getAnomalyCount());
  77. System.out.println("Anomaly Log: " + detector.getAnomalyLog());
  78. }
  79. }

Add your comment