import time
import threading
import logging
import queue
class TaskScheduler:
def __init__(self, max_workers=5):
self.task_queue = queue.Queue() # Queue for tasks
self.workers = [] # List to hold worker threads
self.max_workers = max_workers
self.logging = logging.getLogger(__name__)
self.logging.setLevel(logging.ERROR) #Set log level to ERROR to prevent excessive output
self.handler = logging.StreamHandler()
self.formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
self.handler.setFormatter(self.formatter)
self.logging.addHandler(self.handler)
def add_task(self, task, *args, **kwargs):
"""Adds a task to the queue."""
self.task_queue.put((task, args, kwargs))
def worker(self):
"""Worker thread function to execute tasks."""
while True:
try:
task, args, kwargs = self.task_queue.get(timeout=1) #Get a task from the queue with a timeout
try:
task(*args, **kwargs) # Execute the task
except Exception as e:
self.logging.error(f"Task failed: {e}", exc_info=True) # Log the error
finally:
self.task_queue.task_done()
except queue.Empty:
# Timeout reached, check if workers should exit
break
def start(self):
"""Starts the worker threads."""
for _ in range(self.max_workers):
worker = threading.Thread(target=self.worker)
worker.daemon = True # Allow main thread to exit even if workers are running
self.workers.append(worker)
worker.start()
def wait_completion(self):
"""Waits for all tasks in the queue to complete."""
self.task_queue.join() #Block until all items in the queue have been gotten and processed.
def stop(self):
"""Stops the worker threads."""
for worker in self.workers:
worker.join() #Wait for the threads to finish
self.task_queue.join() #Ensure all remaining tasks are processed
self.workers = []
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO) #Set default logging level
scheduler = TaskScheduler(max_workers=3)
scheduler.start()
def my_task(task_id):
"""A short-lived task."""
print(f"Task {task_id} started at {time.time()}")
time.sleep(2) # Simulate some work
print(f"Task {task_id} finished at {time.time()}")
# Add tasks to the scheduler
for i in range(5):
scheduler.add_task(my_task, i)
scheduler.wait_completion()
scheduler.stop()
print("All tasks completed.")
Add your comment