import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LogBatcher {
private final int batchSize;
private final List<LogEntry> logEntries = new ArrayList<>();
private final Queue<List<LogEntry>> batchQueue = new LinkedList<>();
private final int numThreads;
private final ExecutorService executorService;
public LogBatcher(int batchSize, int numThreads) {
this.batchSize = batchSize;
this.numThreads = numThreads;
this.executorService = Executors.newFixedThreadPool(numThreads);
}
public void addLogEntry(LogEntry entry) {
logEntries.add(entry);
if (logEntries.size() >= batchSize) {
List<LogEntry> batch = new ArrayList<>(logEntries); // Create a copy to avoid modification issues
batchQueue.offer(batch);
logEntries.clear(); // Clear the logEntries list
}
}
public void startBatching() {
for (int i = 0; i < numThreads; i++) {
executorService.submit(processBatch);
}
}
private void processBatch() {
while (true) {
List<LogEntry> batch = batchQueue.poll(); // Get a batch from the queue
if (batch == null) {
break; // No more batches to process
}
try {
processBatchInternally(batch);
} catch (Exception e) {
// Handle errors during batch processing (fallback logic)
System.err.println("Error processing batch: " + e.getMessage());
//Consider logging the error or implementing more sophisticated retry logic
}
}
executorService.shutdown(); // Shutdown the thread after processing all batches
}
private void processBatchInternally(List<LogEntry> batch) {
try {
// Simulate some operation that processes the log entries
// Replace this with your actual log processing logic
System.out.println("Processing batch of size: " + batch.size());
for (LogEntry entry : batch) {
System.out.println(" - " + entry.getMessage());
}
// Simulate some delay
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupted status
System.err.println("Batch processing interrupted: " + e.getMessage());
//Fallback Logic: Log the batch to a file if processing fails.
System.out.println("Logging batch to file due to interruption.");
}
}
public void shutdown() {
executorService.shutdownNow();
}
public static void main(String[] args) throws InterruptedException {
int batchSize = 5;
int numThreads = 3;
LogBatcher batcher = new LogBatcher(batchSize, numThreads);
// Simulate log entries being added
for (int i = 0; i < 20; i++) {
batcher.addLogEntry(new LogEntry("Log message " + i));
Thread.sleep(50);
}
batcher.startBatching();
//Wait for the threads to complete. Important to avoid premature exit.
batcher.executorService.awaitTermination(5, TimeUnit.SECONDS);
batcher.shutdown();
}
// Simple LogEntry class for demonstration
public static class LogEntry {
private final String message;
public LogEntry(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
}
Add your comment