1. import java.util.concurrent.*;
  2. import java.util.List;
  3. import java.util.ArrayList;
  4. import java.util.Map;
  5. import java.util.HashMap;
  6. class TaskQueueAggregator {
  7. private final List<TaskQueue> taskQueues;
  8. private final int retryIntervalMillis;
  9. private final int maxRetries;
  10. public TaskQueueAggregator(List<TaskQueue> taskQueues, int retryIntervalMillis, int maxRetries) {
  11. this.taskQueues = taskQueues;
  12. this.retryIntervalMillis = retryIntervalMillis;
  13. this.maxRetries = maxRetries;
  14. }
  15. public Map<String, Integer> aggregateTasks() throws InterruptedException {
  16. Map<String, Integer> aggregatedResults = new HashMap<>();
  17. ExecutorService executor = Executors.newFixedThreadPool(taskQueues.size());
  18. for (TaskQueue taskQueue : taskQueues) {
  19. executor.submit(() -> processQueue(taskQueue, aggregatedResults));
  20. }
  21. executor.shutdown();
  22. executor.awaitTermination(60, TimeUnit.SECONDS); // Adjust timeout as needed.
  23. return aggregatedResults;
  24. }
  25. private void processQueue(TaskQueue taskQueue, Map<String, Integer> aggregatedResults) {
  26. for (Task task : taskQueue.getTasks()) {
  27. int retries = 0;
  28. boolean success = false;
  29. while (retries <= maxRetries) {
  30. try {
  31. success = task.execute(); // Execute the task
  32. if (success) {
  33. aggregatedResults.put(task.getTaskId(), task.getValue()); // Update aggregated results.
  34. break; // Task completed successfully, exit retry loop.
  35. } else {
  36. retries++;
  37. if (retries <= maxRetries) {
  38. Thread.sleep(retryIntervalMillis); // Wait before retrying
  39. }
  40. }
  41. } catch (Exception e) {
  42. retries++;
  43. if (retries <= maxRetries) {
  44. Thread.sleep(retryIntervalMillis);
  45. }
  46. // Log the exception (optional)
  47. System.err.println("Task " + task.getTaskId() + " failed. Retry: " + retries + ", Error: " + e.getMessage());
  48. }
  49. }
  50. }
  51. }
  52. //Inner class representing a TaskQueue
  53. static class TaskQueue {
  54. private final List<Task> tasks;
  55. public TaskQueue(List<Task> tasks) {
  56. this.tasks = tasks;
  57. }
  58. public List<Task> getTasks() {
  59. return tasks;
  60. }
  61. }
  62. //Inner class representing a Task
  63. static class Task {
  64. private final String taskId;
  65. private final Runnable taskFunction;
  66. public Task(String taskId, Runnable taskFunction) {
  67. this.taskId = taskId;
  68. this.taskFunction = taskFunction;
  69. }
  70. public String getTaskId() {
  71. return taskId;
  72. }
  73. public int execute() {
  74. try {
  75. taskFunction.run();
  76. return 1; // Indicate success
  77. } catch (Exception e) {
  78. throw e; // Re-throw the exception for retry logic.
  79. }
  80. }
  81. public int getValue() {
  82. return 0; // Default value. Override in specific task if needed.
  83. }
  84. }
  85. public static void main(String[] args) throws InterruptedException {
  86. // Example Usage
  87. List<Task> task1 = new ArrayList<>();
  88. task1.add(new Task("task1", () -> { System.out.println("Task 1 executing");}));
  89. List<Task> task2 = new ArrayList<>();
  90. task2.add(new Task("task2", () -> { System.out.println("Task 2 executing");}));
  91. List<Task> task3 = new ArrayList<>();
  92. task3.add(new Task("task3", () -> { throw new RuntimeException("Simulated error");}));
  93. List<TaskQueue> taskQueues = new ArrayList<>();
  94. taskQueues.add(new TaskQueue(task1));
  95. taskQueues.add(new TaskQueue(task2));
  96. taskQueues.add(new TaskQueue(task3));
  97. TaskQueueAggregator aggregator = new TaskQueueAggregator(taskQueues, 500, 3); //Retry every 500ms, max 3 retries

Add your comment