import asyncio
from bs4 import BeautifulSoup
import requests
class TaskQueue:
def __init__(self):
self.queue = []
self.running = False
def enqueue(self, task):
"""Adds a task to the queue."""
self.queue.append(task)
async def _run_task(self, task):
"""Executes a single task."""
try:
# Extract necessary data from task
url = task['url']
selector = task['selector']
action = task['action'] # e.g., 'extract_text', 'click'
response = requests.get(url)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
soup = BeautifulSoup(response.content, 'html.parser')
if action == 'extract_text':
element = soup.select_one(selector)
if element:
result = element.get_text(strip=True)
print(f"Extracted text from {selector}: {result}")
task['result'] = result # store result for later use if needed
else:
print(f"Element not found with selector: {selector}")
task['result'] = None
elif action == 'click':
element = soup.select_one(selector)
if element:
print(f"Simulating click on {selector}")
# In a full automation system, you'd use a library like Selenium here
task['result'] = True # Indicate successful click
else:
print(f"Element not found with selector: {selector}")
task['result'] = False
else:
print(f"Unknown action: {action}")
task['result'] = None
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
task['result'] = None
except Exception as e:
print(f"Error processing task: {e}")
task['result'] = None
async def run(self):
"""Starts the task queue and processes tasks."""
self.running = True
while self.running and self.queue:
task = self.queue.pop(0) #FIFO
asyncio.create_task(self._run_task(task)) # Run tasks concurrently
self.running = False
print("All tasks completed.")
async def main():
queue = TaskQueue()
# Define tasks
tasks = [
{'url': 'https://www.example.com', 'selector': 'h1', 'action': 'extract_text'},
{'url': 'https://www.example.com', 'selector': '#my-button', 'action': 'click'},
{'url': 'https://www.google.com', 'selector': 'input[name="q"]', 'action': 'extract_text'}
]
# Enqueue tasks
for task in tasks:
queue.enqueue(task)
# Run the queue
await queue.run()
if __name__ == "__main__":
asyncio.run(main())
Add your comment