1. import asyncio
  2. import aiohttp
  3. import queue
  4. import time
  5. import logging
  6. # Configure logging
  7. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  8. class Task:
  9. def __init__(self, url, task_id):
  10. self.url = url
  11. self.task_id = task_id
  12. self.status = "pending"
  13. self.result = None
  14. self.error = None
  15. class TaskQueue:
  16. def __init__(self):
  17. self.task_queue = queue.Queue()
  18. self.next_task_id = 1
  19. def add_task(self, url):
  20. task_id = self.next_task_id
  21. task = Task(url, task_id)
  22. self.task_queue.put(task)
  23. self.next_task_id += 1
  24. def get_task(self):
  25. try:
  26. return self.task_queue.get(timeout=5) # Wait up to 5 seconds
  27. except queue.Empty:
  28. return None
  29. def task_done(self):
  30. self.task_queue.task_done()
  31. class HTMLDownloader:
  32. def __init__(self, session=None):
  33. self.session = session if session else None
  34. async def download_html(self, task):
  35. try:
  36. start_time = time.time()
  37. async with aiohttp.ClientSession() as session:
  38. if self.session:
  39. client = self.session
  40. else:
  41. client = session
  42. async with client.get(task.url) as response:
  43. response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
  44. html = await response.text()
  45. end_time = time.time()
  46. duration = end_time - start_time
  47. task.result = {"html": html, "duration": duration}
  48. task.status = "completed"
  49. logging.info(f"Task {task.task_id} completed for {task.url} in {duration:.2f} seconds.")
  50. except aiohttp.ClientError as e:
  51. task.error = str(e)
  52. task.status = "failed"
  53. logging.error(f"Task {task.task_id} failed for {task.url}: {e}")
  54. except Exception as e:
  55. task.error = str(e)
  56. task.status = "failed"
  57. logging.error(f"Task {task.task_id} failed for {task.url}: {e}")
  58. finally:
  59. self.task_done()
  60. async def main(urls):
  61. task_queue = TaskQueue()
  62. for url in urls:
  63. task_queue.add_task(url)
  64. downloaders = []
  65. for _ in range(min(10, len(urls))): # Limit concurrent tasks to 10 or number of urls
  66. downloader = HTMLDownloader()
  67. downloaders.append(asyncio.create_task(download_html(downloader, task_queue)))
  68. await task_queue.join() # Wait for all tasks to be processed
  69. for task in list(task_queue.task_queue.queue): # Iterate over the queue
  70. if task.status == "completed":
  71. print(f"Task {task.task_id}: {task.result}")
  72. elif task.status == "failed":
  73. print(f"Task {task.task_id}: {task.error}")
  74. if __name__ == "__main__":
  75. urls = [
  76. "https://www.example.com",
  77. "https://www.python.org",
  78. "https://www.google.com",
  79. "https://httpstat.us/200",
  80. "https://httpstat.us/404", # Example of a failed URL
  81. ]
  82. asyncio.run(main(urls))

Add your comment