1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. import java.util.concurrent.Concurrent久期队列;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.ScheduledExecutorService;
  7. import java.util.concurrent.TimeUnit;
  8. public class DataStreamer {
  9. private final ConcurrentHashMap<String, List<String>> dataCache = new ConcurrentHashMap<>(); // Stores data for each key
  10. private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // For periodic processing
  11. private final long timeoutMillis; // Timeout duration in milliseconds
  12. private final Runnable cleanupTask; // Task to clean up old data
  13. public DataStreamer(long timeoutMillis, Runnable cleanupTask) {
  14. this.timeoutMillis = timeoutMillis;
  15. this.cleanupTask = cleanupTask;
  16. startCleanupTask();
  17. }
  18. private void startCleanupTask() {
  19. scheduler.scheduleAtFixedRate(cleanupTask, timeoutMillis, timeoutMillis, TimeUnit.MILLISECONDS);
  20. }
  21. public void addData(String key, String data) {
  22. dataCache.computeIfAbsent(key, k -> new ArrayList<>()).add(data); // Add data to the list for the key
  23. }
  24. public List<String> getData(String key) {
  25. return dataCache.getOrDefault(key, new ArrayList<>()); // Return data for the key, or an empty list if not found
  26. }
  27. public void stop() {
  28. scheduler.shutdown(); // Stop the scheduler
  29. }
  30. }

Add your comment