1. import time
  2. import threading
  3. import logging
  4. import queue
  5. class TaskScheduler:
  6. def __init__(self, max_workers=5):
  7. self.task_queue = queue.Queue() # Queue for tasks
  8. self.workers = [] # List to hold worker threads
  9. self.max_workers = max_workers
  10. self.logging = logging.getLogger(__name__)
  11. self.logging.setLevel(logging.ERROR) #Set log level to ERROR to prevent excessive output
  12. self.handler = logging.StreamHandler()
  13. self.formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  14. self.handler.setFormatter(self.formatter)
  15. self.logging.addHandler(self.handler)
  16. def add_task(self, task, *args, **kwargs):
  17. """Adds a task to the queue."""
  18. self.task_queue.put((task, args, kwargs))
  19. def worker(self):
  20. """Worker thread function to execute tasks."""
  21. while True:
  22. try:
  23. task, args, kwargs = self.task_queue.get(timeout=1) #Get a task from the queue with a timeout
  24. try:
  25. task(*args, **kwargs) # Execute the task
  26. except Exception as e:
  27. self.logging.error(f"Task failed: {e}", exc_info=True) # Log the error
  28. finally:
  29. self.task_queue.task_done()
  30. except queue.Empty:
  31. # Timeout reached, check if workers should exit
  32. break
  33. def start(self):
  34. """Starts the worker threads."""
  35. for _ in range(self.max_workers):
  36. worker = threading.Thread(target=self.worker)
  37. worker.daemon = True # Allow main thread to exit even if workers are running
  38. self.workers.append(worker)
  39. worker.start()
  40. def wait_completion(self):
  41. """Waits for all tasks in the queue to complete."""
  42. self.task_queue.join() #Block until all items in the queue have been gotten and processed.
  43. def stop(self):
  44. """Stops the worker threads."""
  45. for worker in self.workers:
  46. worker.join() #Wait for the threads to finish
  47. self.task_queue.join() #Ensure all remaining tasks are processed
  48. self.workers = []
  49. if __name__ == '__main__':
  50. logging.basicConfig(level=logging.INFO) #Set default logging level
  51. scheduler = TaskScheduler(max_workers=3)
  52. scheduler.start()
  53. def my_task(task_id):
  54. """A short-lived task."""
  55. print(f"Task {task_id} started at {time.time()}")
  56. time.sleep(2) # Simulate some work
  57. print(f"Task {task_id} finished at {time.time()}")
  58. # Add tasks to the scheduler
  59. for i in range(5):
  60. scheduler.add_task(my_task, i)
  61. scheduler.wait_completion()
  62. scheduler.stop()
  63. print("All tasks completed.")

Add your comment