import asyncio
import functools
import inspect
from typing import Callable, Any, Dict, List, Optional
class TaskQueue:
def __init__(self, name: str):
self.name = name
self._tasks: List[Any] = []
self._lock = asyncio.Lock()
async def enqueue(self, task: Callable, *args, **kwargs):
async with self._lock:
self._tasks.append((task, args, kwargs))
await self._process()
async def _process(self):
if self._tasks:
task, args, kwargs = self._tasks.pop(0)
try:
await task(*args, **kwargs)
except Exception as e:
print(f"Task {self.name} failed: {e}")
finally:
self._process() # Process next task
class NestedTaskQueue:
def __init__(self, name: str, parent: Optional['NestedTaskQueue'] = None):
self.name = name
self.parent = parent
self._tasks: List[Any] = []
self._lock = asyncio.Lock()
async def enqueue(self, task: Callable, *args, **kwargs):
async with self._lock:
self._tasks.append((task, args, kwargs))
await self._process()
async def _process(self):
if self._tasks:
task, args, kwargs = self._tasks.pop(0)
try:
await task(*args, **kwargs)
except Exception as e:
print(f"Task {self.name} failed: {e}")
finally:
self._process()
def get_child(self, name: str) -> 'NestedTaskQueue':
if self.parent:
return self.parent.get_child(name)
else:
return NestedTaskQueue(name, parent=self)
def nest_task_queues(*args, **kwargs):
"""
Creates a nested structure of task queues with optional flags.
"""
root_queue = TaskQueue("root")
queues = []
for arg in args:
if isinstance(arg, str):
# Create a nested queue based on the string argument
queue = TaskQueue(arg)
queues.append(queue)
elif isinstance(arg, NestedTaskQueue):
queues.append(arg)
else:
raise TypeError("Each argument must be a string or a NestedTaskQueue instance.")
# Connect the queues (optional, but allows for hierarchical task execution)
for i in range(len(queues) - 1):
queues[i].parent = queues[i+1]
return root_queue, queues
async def run_tasks(root_queue: TaskQueue, queues: List[NestedTaskQueue]):
"""
Starts the task queues.
"""
tasks = []
# Enqueue tasks into the root queue
for i in range(5):
async def task_function(task_id):
print(f"Running root task {task_id}")
await asyncio.sleep(1)
print(f"Root task {task_id} finished")
tasks.append(root_queue.enqueue(task_function, i))
# Enqueue tasks into child queues
for i, queue in enumerate(queues):
for j in range(3):
async def child_task(task_id, queue_id):
print(f"Running child task {task_id} in queue {queue_id}")
await asyncio.sleep(0.5)
print(f"Child task {task_id} in queue {queue_id} finished")
queue.enqueue(child_task, j, queue_id)
await asyncio.gather(*tasks)
if __name__ == '__main__':
# Example usage:
root, nested_queues = nest_task_queues(
"queue1",
"queue2",
NestedTaskQueue("queue3"),
"queue4"
)
asyncio.run(run_tasks(root, nested_queues))
Add your comment