1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.Queue;
  4. import java.util.LinkedList;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.TimeUnit;
  8. public class LogBatcher {
  9. private final int batchSize;
  10. private final List<LogEntry> logEntries = new ArrayList<>();
  11. private final Queue<List<LogEntry>> batchQueue = new LinkedList<>();
  12. private final int numThreads;
  13. private final ExecutorService executorService;
  14. public LogBatcher(int batchSize, int numThreads) {
  15. this.batchSize = batchSize;
  16. this.numThreads = numThreads;
  17. this.executorService = Executors.newFixedThreadPool(numThreads);
  18. }
  19. public void addLogEntry(LogEntry entry) {
  20. logEntries.add(entry);
  21. if (logEntries.size() >= batchSize) {
  22. List<LogEntry> batch = new ArrayList<>(logEntries); // Create a copy to avoid modification issues
  23. batchQueue.offer(batch);
  24. logEntries.clear(); // Clear the logEntries list
  25. }
  26. }
  27. public void startBatching() {
  28. for (int i = 0; i < numThreads; i++) {
  29. executorService.submit(processBatch);
  30. }
  31. }
  32. private void processBatch() {
  33. while (true) {
  34. List<LogEntry> batch = batchQueue.poll(); // Get a batch from the queue
  35. if (batch == null) {
  36. break; // No more batches to process
  37. }
  38. try {
  39. processBatchInternally(batch);
  40. } catch (Exception e) {
  41. // Handle errors during batch processing (fallback logic)
  42. System.err.println("Error processing batch: " + e.getMessage());
  43. //Consider logging the error or implementing more sophisticated retry logic
  44. }
  45. }
  46. executorService.shutdown(); // Shutdown the thread after processing all batches
  47. }
  48. private void processBatchInternally(List<LogEntry> batch) {
  49. try {
  50. // Simulate some operation that processes the log entries
  51. // Replace this with your actual log processing logic
  52. System.out.println("Processing batch of size: " + batch.size());
  53. for (LogEntry entry : batch) {
  54. System.out.println(" - " + entry.getMessage());
  55. }
  56. // Simulate some delay
  57. TimeUnit.MILLISECONDS.sleep(100);
  58. } catch (InterruptedException e) {
  59. Thread.currentThread().interrupt(); // Restore interrupted status
  60. System.err.println("Batch processing interrupted: " + e.getMessage());
  61. //Fallback Logic: Log the batch to a file if processing fails.
  62. System.out.println("Logging batch to file due to interruption.");
  63. }
  64. }
  65. public void shutdown() {
  66. executorService.shutdownNow();
  67. }
  68. public static void main(String[] args) throws InterruptedException {
  69. int batchSize = 5;
  70. int numThreads = 3;
  71. LogBatcher batcher = new LogBatcher(batchSize, numThreads);
  72. // Simulate log entries being added
  73. for (int i = 0; i < 20; i++) {
  74. batcher.addLogEntry(new LogEntry("Log message " + i));
  75. Thread.sleep(50);
  76. }
  77. batcher.startBatching();
  78. //Wait for the threads to complete. Important to avoid premature exit.
  79. batcher.executorService.awaitTermination(5, TimeUnit.SECONDS);
  80. batcher.shutdown();
  81. }
  82. // Simple LogEntry class for demonstration
  83. public static class LogEntry {
  84. private final String message;
  85. public LogEntry(String message) {
  86. this.message = message;
  87. }
  88. public String getMessage() {
  89. return message;
  90. }
  91. }
  92. }

Add your comment