1. import pika
  2. import json
  3. import time
  4. import logging
  5. import configparser
  6. class QueueRetry:
  7. def __init__(self, config_file):
  8. self.config = configparser.ConfigParser()
  9. self.config.read(config_file)
  10. self.connection_factory = self.config['rabbitmq']['connection_factory']
  11. self.exchange = self.config['rabbitmq']['exchange']
  12. self.queue = self.config['rabbitmq']['queue']
  13. self.retry_count = self.config['rabbitmq']['retry_count']
  14. self.retry_delay = self.config['rabbitmq']['retry_delay']
  15. self.logging_level = logging.getLogger().setLevel(self.config['logging']['level'])
  16. self.channel = None
  17. def connect(self):
  18. try:
  19. connection = pika.BlockingConnection(self.connection_factory)
  20. self.channel = connection.channel()
  21. self.channel.queue_exists(self.queue) # Check if the queue exists
  22. self.channel.declare_exchange(self.exchange, exchange_type='direct')
  23. self.channel.queue_bind(self.queue, self.exchange, routing_key='my_routing_key') # Replace 'my_routing_key' with your routing key
  24. self.channel.prefetch(1) # Set prefetch count
  25. self.logging_level.info("Connected to RabbitMQ")
  26. return connection
  27. except pika.exceptions.ConnectionRefusedError:
  28. self.logging_level.error("RabbitMQ connection refused.")
  29. return None
  30. except Exception as e:
  31. self.logging_level.error(f"Error connecting to RabbitMQ: {e}")
  32. return None
  33. def publish(self, message):
  34. connection = self.connect()
  35. if not connection:
  36. return False
  37. try:
  38. channel = connection.channel()
  39. channel.basic_publish(exchange=self.exchange,
  40. routing_key='my_routing_key', # Replace 'my_routing_key' with your routing key
  41. body=json.dumps(message))
  42. channel.close()
  43. connection.close()
  44. self.logging_level.info(f"Published message: {message}")
  45. return True
  46. except Exception as e:
  47. self.logging_level.error(f"Error publishing message: {e}")
  48. return False
  49. def consume(self, callback):
  50. connection = self.connect()
  51. if not connection:
  52. return
  53. try:
  54. channel = connection.channel()
  55. channel.basic_consume(self.queue, callback=callback, auto_ack=False)
  56. self.logging_level.info(f"Consuming from queue: {self.queue}")
  57. channel.start_consuming()
  58. except Exception as e:
  59. self.logging_level.error(f"Error consuming message: {e}")
  60. finally:
  61. if channel:
  62. channel.close()
  63. if connection:
  64. connection.close()
  65. def retry_operation(self, operation, max_retries=self.retry_count):
  66. retries = 0
  67. while retries < max_retries:
  68. try:
  69. return operation() # Execute the operation
  70. except Exception as e:
  71. retries += 1
  72. if retries < max_retries:
  73. self.logging_level.warning(f"Operation failed. Retrying in {self.retry_delay} seconds. Attempt {retries}/{max_retries}. Error: {e}")
  74. time.sleep(self.retry_delay)
  75. else:
  76. self.logging_level.error(f"Operation failed after {max_retries} retries. Error: {e}")
  77. raise # Re-raise the exception after all retries have failed
  78. if __name__ == '__main__':
  79. # Example Usage
  80. config_file = 'config.ini' # Replace with your config file
  81. retry_manager = QueueRetry(config_file)
  82. def my_operation():
  83. # Simulate an operation that might fail
  84. print("Attempting operation...")
  85. #raise Exception("Simulated error")
  86. return "Operation successful"
  87. try:
  88. result = retry_manager.retry_operation(my_operation)
  89. print(f"Operation result: {result}")
  90. except Exception as e:
  91. print(f"Operation failed permanently

Add your comment