1. import time
  2. import threading
  3. from queue import Queue
  4. class TaskQueue:
  5. def __init__(self, max_workers=5):
  6. self.queue = Queue()
  7. self.workers = []
  8. self.max_workers = max_workers
  9. self.results = {} # Store results for pretty printing
  10. self.lock = threading.Lock()
  11. def add_task(self, task, timeout=10):
  12. """Adds a task to the queue."""
  13. self.queue.put((task, timeout)) # Store task and timeout
  14. def _worker(self):
  15. """Worker thread function."""
  16. while True:
  17. task, timeout = self.queue.get()
  18. try:
  19. result = task() # Execute the task
  20. with self.lock:
  21. self.results[task] = result # store result
  22. except Exception as e:
  23. with self.lock:
  24. self.results[task] = f"Error: {e}" # Store error
  25. finally:
  26. self.queue.task_done()
  27. def start(self):
  28. """Starts the worker threads."""
  29. for _ in range(self.max_workers):
  30. t = threading.Thread(target=self._worker, daemon=True)
  31. self.workers.append(t)
  32. t.start()
  33. def wait_completion(self):
  34. """Waits for all tasks to complete."""
  35. self.queue.join()
  36. def get_results(self):
  37. """Returns the results dictionary."""
  38. return self.results
  39. def pretty_print_results(results):
  40. """Prints results in a human-readable format."""
  41. for task, result in results.items():
  42. print(f"Task: {task}")
  43. print(f"Result: {result}")
  44. print("-" * 20)
  45. if __name__ == '__main__':
  46. # Example Usage
  47. def my_task(n):
  48. """A sample task that sleeps for a short time."""
  49. time.sleep(n)
  50. return f"Task completed after {n} seconds"
  51. if __name__ == '__main__':
  52. task_queue = TaskQueue(max_workers=2)
  53. task_queue.start()
  54. # Add tasks with timeouts
  55. task_queue.add_task(lambda: my_task(2), timeout=3)
  56. task_queue.add_task(lambda: my_task(1), timeout=5)
  57. task_queue.add_task(lambda: my_task(4), timeout=2)
  58. task_queue.add_task(lambda: raise ValueError("Simulated error"), timeout=4)
  59. task_queue.wait_completion()
  60. results = task_queue.get_results()
  61. pretty_print_results(results)

Add your comment