1. import asyncio
  2. import functools
  3. import inspect
  4. from typing import Callable, Any, Dict, List, Optional
  5. class TaskQueue:
  6. def __init__(self, name: str):
  7. self.name = name
  8. self._tasks: List[Any] = []
  9. self._lock = asyncio.Lock()
  10. async def enqueue(self, task: Callable, *args, **kwargs):
  11. async with self._lock:
  12. self._tasks.append((task, args, kwargs))
  13. await self._process()
  14. async def _process(self):
  15. if self._tasks:
  16. task, args, kwargs = self._tasks.pop(0)
  17. try:
  18. await task(*args, **kwargs)
  19. except Exception as e:
  20. print(f"Task {self.name} failed: {e}")
  21. finally:
  22. self._process() # Process next task
  23. class NestedTaskQueue:
  24. def __init__(self, name: str, parent: Optional['NestedTaskQueue'] = None):
  25. self.name = name
  26. self.parent = parent
  27. self._tasks: List[Any] = []
  28. self._lock = asyncio.Lock()
  29. async def enqueue(self, task: Callable, *args, **kwargs):
  30. async with self._lock:
  31. self._tasks.append((task, args, kwargs))
  32. await self._process()
  33. async def _process(self):
  34. if self._tasks:
  35. task, args, kwargs = self._tasks.pop(0)
  36. try:
  37. await task(*args, **kwargs)
  38. except Exception as e:
  39. print(f"Task {self.name} failed: {e}")
  40. finally:
  41. self._process()
  42. def get_child(self, name: str) -> 'NestedTaskQueue':
  43. if self.parent:
  44. return self.parent.get_child(name)
  45. else:
  46. return NestedTaskQueue(name, parent=self)
  47. def nest_task_queues(*args, **kwargs):
  48. """
  49. Creates a nested structure of task queues with optional flags.
  50. """
  51. root_queue = TaskQueue("root")
  52. queues = []
  53. for arg in args:
  54. if isinstance(arg, str):
  55. # Create a nested queue based on the string argument
  56. queue = TaskQueue(arg)
  57. queues.append(queue)
  58. elif isinstance(arg, NestedTaskQueue):
  59. queues.append(arg)
  60. else:
  61. raise TypeError("Each argument must be a string or a NestedTaskQueue instance.")
  62. # Connect the queues (optional, but allows for hierarchical task execution)
  63. for i in range(len(queues) - 1):
  64. queues[i].parent = queues[i+1]
  65. return root_queue, queues
  66. async def run_tasks(root_queue: TaskQueue, queues: List[NestedTaskQueue]):
  67. """
  68. Starts the task queues.
  69. """
  70. tasks = []
  71. # Enqueue tasks into the root queue
  72. for i in range(5):
  73. async def task_function(task_id):
  74. print(f"Running root task {task_id}")
  75. await asyncio.sleep(1)
  76. print(f"Root task {task_id} finished")
  77. tasks.append(root_queue.enqueue(task_function, i))
  78. # Enqueue tasks into child queues
  79. for i, queue in enumerate(queues):
  80. for j in range(3):
  81. async def child_task(task_id, queue_id):
  82. print(f"Running child task {task_id} in queue {queue_id}")
  83. await asyncio.sleep(0.5)
  84. print(f"Child task {task_id} in queue {queue_id} finished")
  85. queue.enqueue(child_task, j, queue_id)
  86. await asyncio.gather(*tasks)
  87. if __name__ == '__main__':
  88. # Example usage:
  89. root, nested_queues = nest_task_queues(
  90. "queue1",
  91. "queue2",
  92. NestedTaskQueue("queue3"),
  93. "queue4"
  94. )
  95. asyncio.run(run_tasks(root, nested_queues))

Add your comment