import pika
import json
import time
import logging
import configparser
class QueueRetry:
def __init__(self, config_file):
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.connection_factory = self.config['rabbitmq']['connection_factory']
self.exchange = self.config['rabbitmq']['exchange']
self.queue = self.config['rabbitmq']['queue']
self.retry_count = self.config['rabbitmq']['retry_count']
self.retry_delay = self.config['rabbitmq']['retry_delay']
self.logging_level = logging.getLogger().setLevel(self.config['logging']['level'])
self.channel = None
def connect(self):
try:
connection = pika.BlockingConnection(self.connection_factory)
self.channel = connection.channel()
self.channel.queue_exists(self.queue) # Check if the queue exists
self.channel.declare_exchange(self.exchange, exchange_type='direct')
self.channel.queue_bind(self.queue, self.exchange, routing_key='my_routing_key') # Replace 'my_routing_key' with your routing key
self.channel.prefetch(1) # Set prefetch count
self.logging_level.info("Connected to RabbitMQ")
return connection
except pika.exceptions.ConnectionRefusedError:
self.logging_level.error("RabbitMQ connection refused.")
return None
except Exception as e:
self.logging_level.error(f"Error connecting to RabbitMQ: {e}")
return None
def publish(self, message):
connection = self.connect()
if not connection:
return False
try:
channel = connection.channel()
channel.basic_publish(exchange=self.exchange,
routing_key='my_routing_key', # Replace 'my_routing_key' with your routing key
body=json.dumps(message))
channel.close()
connection.close()
self.logging_level.info(f"Published message: {message}")
return True
except Exception as e:
self.logging_level.error(f"Error publishing message: {e}")
return False
def consume(self, callback):
connection = self.connect()
if not connection:
return
try:
channel = connection.channel()
channel.basic_consume(self.queue, callback=callback, auto_ack=False)
self.logging_level.info(f"Consuming from queue: {self.queue}")
channel.start_consuming()
except Exception as e:
self.logging_level.error(f"Error consuming message: {e}")
finally:
if channel:
channel.close()
if connection:
connection.close()
def retry_operation(self, operation, max_retries=self.retry_count):
retries = 0
while retries < max_retries:
try:
return operation() # Execute the operation
except Exception as e:
retries += 1
if retries < max_retries:
self.logging_level.warning(f"Operation failed. Retrying in {self.retry_delay} seconds. Attempt {retries}/{max_retries}. Error: {e}")
time.sleep(self.retry_delay)
else:
self.logging_level.error(f"Operation failed after {max_retries} retries. Error: {e}")
raise # Re-raise the exception after all retries have failed
if __name__ == '__main__':
# Example Usage
config_file = 'config.ini' # Replace with your config file
retry_manager = QueueRetry(config_file)
def my_operation():
# Simulate an operation that might fail
print("Attempting operation...")
#raise Exception("Simulated error")
return "Operation successful"
try:
result = retry_manager.retry_operation(my_operation)
print(f"Operation result: {result}")
except Exception as e:
print(f"Operation failed permanently
Add your comment