import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
class TaskQueueAggregator {
private final List<TaskQueue> taskQueues;
private final int retryIntervalMillis;
private final int maxRetries;
public TaskQueueAggregator(List<TaskQueue> taskQueues, int retryIntervalMillis, int maxRetries) {
this.taskQueues = taskQueues;
this.retryIntervalMillis = retryIntervalMillis;
this.maxRetries = maxRetries;
}
public Map<String, Integer> aggregateTasks() throws InterruptedException {
Map<String, Integer> aggregatedResults = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(taskQueues.size());
for (TaskQueue taskQueue : taskQueues) {
executor.submit(() -> processQueue(taskQueue, aggregatedResults));
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS); // Adjust timeout as needed.
return aggregatedResults;
}
private void processQueue(TaskQueue taskQueue, Map<String, Integer> aggregatedResults) {
for (Task task : taskQueue.getTasks()) {
int retries = 0;
boolean success = false;
while (retries <= maxRetries) {
try {
success = task.execute(); // Execute the task
if (success) {
aggregatedResults.put(task.getTaskId(), task.getValue()); // Update aggregated results.
break; // Task completed successfully, exit retry loop.
} else {
retries++;
if (retries <= maxRetries) {
Thread.sleep(retryIntervalMillis); // Wait before retrying
}
}
} catch (Exception e) {
retries++;
if (retries <= maxRetries) {
Thread.sleep(retryIntervalMillis);
}
// Log the exception (optional)
System.err.println("Task " + task.getTaskId() + " failed. Retry: " + retries + ", Error: " + e.getMessage());
}
}
}
}
//Inner class representing a TaskQueue
static class TaskQueue {
private final List<Task> tasks;
public TaskQueue(List<Task> tasks) {
this.tasks = tasks;
}
public List<Task> getTasks() {
return tasks;
}
}
//Inner class representing a Task
static class Task {
private final String taskId;
private final Runnable taskFunction;
public Task(String taskId, Runnable taskFunction) {
this.taskId = taskId;
this.taskFunction = taskFunction;
}
public String getTaskId() {
return taskId;
}
public int execute() {
try {
taskFunction.run();
return 1; // Indicate success
} catch (Exception e) {
throw e; // Re-throw the exception for retry logic.
}
}
public int getValue() {
return 0; // Default value. Override in specific task if needed.
}
}
public static void main(String[] args) throws InterruptedException {
// Example Usage
List<Task> task1 = new ArrayList<>();
task1.add(new Task("task1", () -> { System.out.println("Task 1 executing");}));
List<Task> task2 = new ArrayList<>();
task2.add(new Task("task2", () -> { System.out.println("Task 2 executing");}));
List<Task> task3 = new ArrayList<>();
task3.add(new Task("task3", () -> { throw new RuntimeException("Simulated error");}));
List<TaskQueue> taskQueues = new ArrayList<>();
taskQueues.add(new TaskQueue(task1));
taskQueues.add(new TaskQueue(task2));
taskQueues.add(new TaskQueue(task3));
TaskQueueAggregator aggregator = new TaskQueueAggregator(taskQueues, 500, 3); //Retry every 500ms, max 3 retries
Add your comment