import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Concurrent久期队列;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DataStreamer {
private final ConcurrentHashMap<String, List<String>> dataCache = new ConcurrentHashMap<>(); // Stores data for each key
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // For periodic processing
private final long timeoutMillis; // Timeout duration in milliseconds
private final Runnable cleanupTask; // Task to clean up old data
public DataStreamer(long timeoutMillis, Runnable cleanupTask) {
this.timeoutMillis = timeoutMillis;
this.cleanupTask = cleanupTask;
startCleanupTask();
}
private void startCleanupTask() {
scheduler.scheduleAtFixedRate(cleanupTask, timeoutMillis, timeoutMillis, TimeUnit.MILLISECONDS);
}
public void addData(String key, String data) {
dataCache.computeIfAbsent(key, k -> new ArrayList<>()).add(data); // Add data to the list for the key
}
public List<String> getData(String key) {
return dataCache.getOrDefault(key, new ArrayList<>()); // Return data for the key, or an empty list if not found
}
public void stop() {
scheduler.shutdown(); // Stop the scheduler
}
}
Add your comment