import queue
import logging
import time
import traceback
class TaskQueue:
def __init__(self, max_workers=5):
self.task_queue = queue.Queue()
self.workers = []
self.max_workers = max_workers
def start(self):
"""Starts the worker threads."""
for _ in range(self.max_workers):
worker = threading.Thread(target=self._worker)
self.workers.append(worker)
worker.daemon = True # Allow the main thread to exit even if workers are running
worker.start()
def submit(self, task, *args, **kwargs):
"""Submits a task to the queue."""
self.task_queue.put((task, args, kwargs))
def _worker(self):
"""Worker thread function."""
while True:
try:
task, args, kwargs = self.task_queue.get(timeout=1) # Get task with timeout
try:
task(*args, **kwargs) # Execute the task
except Exception as e:
logging.error(f"Task failed: {e}")
traceback.print_exc()
finally:
self.task_queue.task_done() # Mark task as completed
except queue.Empty:
# Timeout occurred, check if we should stop
if all(not w.is_alive() for w in self.workers):
break
def wait_completion(self):
"""Waits for all tasks in the queue to be completed."""
self.task_queue.join()
import threading
logging.basicConfig(level=logging.ERROR) #Configure basic logging
if __name__ == '__main__':
# Example usage
task_queue = TaskQueue(max_workers=3)
task_queue.start()
def my_task(task_id):
"""A short-lived task."""
print(f"Task {task_id} started")
time.sleep(2)
print(f"Task {task_id} completed")
for i in range(10):
task_queue.submit(my_task, i)
task_queue.wait_completion()
print("All tasks completed.")
Add your comment