1. import queue
  2. import logging
  3. import time
  4. import traceback
  5. class TaskQueue:
  6. def __init__(self, max_workers=5):
  7. self.task_queue = queue.Queue()
  8. self.workers = []
  9. self.max_workers = max_workers
  10. def start(self):
  11. """Starts the worker threads."""
  12. for _ in range(self.max_workers):
  13. worker = threading.Thread(target=self._worker)
  14. self.workers.append(worker)
  15. worker.daemon = True # Allow the main thread to exit even if workers are running
  16. worker.start()
  17. def submit(self, task, *args, **kwargs):
  18. """Submits a task to the queue."""
  19. self.task_queue.put((task, args, kwargs))
  20. def _worker(self):
  21. """Worker thread function."""
  22. while True:
  23. try:
  24. task, args, kwargs = self.task_queue.get(timeout=1) # Get task with timeout
  25. try:
  26. task(*args, **kwargs) # Execute the task
  27. except Exception as e:
  28. logging.error(f"Task failed: {e}")
  29. traceback.print_exc()
  30. finally:
  31. self.task_queue.task_done() # Mark task as completed
  32. except queue.Empty:
  33. # Timeout occurred, check if we should stop
  34. if all(not w.is_alive() for w in self.workers):
  35. break
  36. def wait_completion(self):
  37. """Waits for all tasks in the queue to be completed."""
  38. self.task_queue.join()
  39. import threading
  40. logging.basicConfig(level=logging.ERROR) #Configure basic logging
  41. if __name__ == '__main__':
  42. # Example usage
  43. task_queue = TaskQueue(max_workers=3)
  44. task_queue.start()
  45. def my_task(task_id):
  46. """A short-lived task."""
  47. print(f"Task {task_id} started")
  48. time.sleep(2)
  49. print(f"Task {task_id} completed")
  50. for i in range(10):
  51. task_queue.submit(my_task, i)
  52. task_queue.wait_completion()
  53. print("All tasks completed.")

Add your comment