import asyncio
import aiohttp
import queue
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Task:
def __init__(self, url, task_id):
self.url = url
self.task_id = task_id
self.status = "pending"
self.result = None
self.error = None
class TaskQueue:
def __init__(self):
self.task_queue = queue.Queue()
self.next_task_id = 1
def add_task(self, url):
task_id = self.next_task_id
task = Task(url, task_id)
self.task_queue.put(task)
self.next_task_id += 1
def get_task(self):
try:
return self.task_queue.get(timeout=5) # Wait up to 5 seconds
except queue.Empty:
return None
def task_done(self):
self.task_queue.task_done()
class HTMLDownloader:
def __init__(self, session=None):
self.session = session if session else None
async def download_html(self, task):
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
if self.session:
client = self.session
else:
client = session
async with client.get(task.url) as response:
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
html = await response.text()
end_time = time.time()
duration = end_time - start_time
task.result = {"html": html, "duration": duration}
task.status = "completed"
logging.info(f"Task {task.task_id} completed for {task.url} in {duration:.2f} seconds.")
except aiohttp.ClientError as e:
task.error = str(e)
task.status = "failed"
logging.error(f"Task {task.task_id} failed for {task.url}: {e}")
except Exception as e:
task.error = str(e)
task.status = "failed"
logging.error(f"Task {task.task_id} failed for {task.url}: {e}")
finally:
self.task_done()
async def main(urls):
task_queue = TaskQueue()
for url in urls:
task_queue.add_task(url)
downloaders = []
for _ in range(min(10, len(urls))): # Limit concurrent tasks to 10 or number of urls
downloader = HTMLDownloader()
downloaders.append(asyncio.create_task(download_html(downloader, task_queue)))
await task_queue.join() # Wait for all tasks to be processed
for task in list(task_queue.task_queue.queue): # Iterate over the queue
if task.status == "completed":
print(f"Task {task.task_id}: {task.result}")
elif task.status == "failed":
print(f"Task {task.task_id}: {task.error}")
if __name__ == "__main__":
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.google.com",
"https://httpstat.us/200",
"https://httpstat.us/404", # Example of a failed URL
]
asyncio.run(main(urls))
Add your comment