import time
import threading
from queue import Queue
class TaskQueue:
def __init__(self, max_workers=5):
self.queue = Queue()
self.workers = []
self.max_workers = max_workers
self.results = {} # Store results for pretty printing
self.lock = threading.Lock()
def add_task(self, task, timeout=10):
"""Adds a task to the queue."""
self.queue.put((task, timeout)) # Store task and timeout
def _worker(self):
"""Worker thread function."""
while True:
task, timeout = self.queue.get()
try:
result = task() # Execute the task
with self.lock:
self.results[task] = result # store result
except Exception as e:
with self.lock:
self.results[task] = f"Error: {e}" # Store error
finally:
self.queue.task_done()
def start(self):
"""Starts the worker threads."""
for _ in range(self.max_workers):
t = threading.Thread(target=self._worker, daemon=True)
self.workers.append(t)
t.start()
def wait_completion(self):
"""Waits for all tasks to complete."""
self.queue.join()
def get_results(self):
"""Returns the results dictionary."""
return self.results
def pretty_print_results(results):
"""Prints results in a human-readable format."""
for task, result in results.items():
print(f"Task: {task}")
print(f"Result: {result}")
print("-" * 20)
if __name__ == '__main__':
# Example Usage
def my_task(n):
"""A sample task that sleeps for a short time."""
time.sleep(n)
return f"Task completed after {n} seconds"
if __name__ == '__main__':
task_queue = TaskQueue(max_workers=2)
task_queue.start()
# Add tasks with timeouts
task_queue.add_task(lambda: my_task(2), timeout=3)
task_queue.add_task(lambda: my_task(1), timeout=5)
task_queue.add_task(lambda: my_task(4), timeout=2)
task_queue.add_task(lambda: raise ValueError("Simulated error"), timeout=4)
task_queue.wait_completion()
results = task_queue.get_results()
pretty_print_results(results)
Add your comment