1. import asyncio
  2. import aiofiles
  3. import threading
  4. import queue
  5. import time
  6. class FileBuffer:
  7. def __init__(self, queue_size=10):
  8. self.queue = queue.Queue(maxsize=queue_size)
  9. self.lock = threading.Lock()
  10. self.stop_event = threading.Event()
  11. def enqueue(self, filepath):
  12. """Adds a filepath to the queue."""
  13. with self.lock:
  14. self.queue.put(filepath)
  15. def dequeue(self):
  16. """Retrieves a filepath from the queue. Blocks if empty."""
  17. return self.queue.get()
  18. def is_empty(self):
  19. """Checks if the queue is empty."""
  20. return self.queue.empty()
  21. def stop(self):
  22. """Signals the buffer to stop processing."""
  23. self.stop_event.set()
  24. def get_all(self):
  25. """Returns a list of all files in the buffer."""
  26. with self.lock:
  27. return list(self.queue.queue)
  28. async def process_file(filepath):
  29. """Simulates a short-lived synchronous task processing a file."""
  30. print(f"Processing file: {filepath}")
  31. time.sleep(2) # Simulate processing time
  32. print(f"Finished processing: {filepath}")
  33. async def main(file_buffer):
  34. """Main function to demonstrate file buffering."""
  35. while not file_buffer.is_empty() and not file_buffer.stop_event.is_set():
  36. filepath = file_buffer.dequeue()
  37. asyncio.create_task(process_file(filepath)) # Run processing in a separate task
  38. await asyncio.sleep(0.1) # Allow other tasks to run
  39. if __name__ == "__main__":
  40. buffer = FileBuffer(queue_size=3)
  41. file_paths = ["file1.txt", "file2.txt", "file3.txt", "file4.txt", "file5.txt"]
  42. # Enqueue files
  43. for file_path in file_paths:
  44. buffer.enqueue(file_path)
  45. # Start the processing tasks
  46. asyncio.run(main(buffer))
  47. print("All files processed.")

Add your comment