import asyncio
import aiohttp
import queue
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class HTTPTask:
def __init__(self, url, task_id):
self.url = url
self.task_id = task_id
class TaskQueue:
def __init__(self, max_size=100):
self.queue = queue.Queue(maxsize=max_size)
def enqueue(self, task):
self.queue.put(task)
def dequeue(self):
try:
return self.queue.get(timeout=1) # Timeout to prevent indefinite blocking
except queue.Empty:
return None
def size(self):
return self.queue.qsize()
class HTTPWorker:
def __init__(self, session, task_queue):
self.session = session
self.task_queue = task_queue
async def worker(self):
while True:
task = self.task_queue.dequeue()
if task is None:
break # Exit if queue is empty and timeout occurs
try:
start_time = time.time()
async with self.session.get(task.url) as response:
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
end_time = time.time()
duration = end_time - start_time
logging.info(f"Task {task.task_id} completed. URL: {task.url}, Status: {response.status}, Duration: {duration:.2f}s")
# Process response data here if needed
except aiohttp.ClientError as e:
logging.error(f"Error processing task {task.task_id} for URL {task.url}: {e}")
finally:
self.task_queue.task_done() # Signal that the task is complete
async def main(urls, num_workers=5):
task_queue = TaskQueue()
tasks = []
for i, url in enumerate(urls):
task = HTTPTask(url, i)
task_queue.enqueue(task)
tasks.append(task)
async with aiohttp.ClientSession() as session:
workers = [HTTPWorker(session, task_queue) for _ in range(num_workers)]
await asyncio.gather(*[w.worker() for w in workers]) # Run workers concurrently
if __name__ == "__main__":
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.python.org",
"https://httpstat.us/200",
"https://httpstat.us/404",
"https://httpstat.us/500"
]
asyncio.run(main(urls))
Add your comment