1. import redis
  2. import json
  3. import time
  4. import logging
  5. # Configure logging
  6. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  7. class MessageQueueMirror:
  8. def __init__(self, main_queue_url, mirror_queue_url, redis_host='localhost', redis_port=6379):
  9. """
  10. Initializes the MessageQueueMirror.
  11. Args:
  12. main_queue_url (str): URL of the main message queue (e.g., RabbitMQ, Kafka).
  13. mirror_queue_url (str): URL of the mirror message queue (e.g., Redis).
  14. redis_host (str): Redis host.
  15. redis_port (int): Redis port.
  16. """
  17. self.main_queue_url = main_queue_url
  18. self.mirror_queue_url = mirror_queue_url
  19. self.redis_host = redis_host
  20. self.redis_port = redis_port
  21. self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port, decode_responses=True)
  22. def mirror_messages(self, message_queue_name):
  23. """
  24. Mirrors messages from a specified queue in the main queue to the mirror queue.
  25. Args:
  26. message_queue_name (str): The name of the queue to mirror.
  27. """
  28. try:
  29. # Consume messages from the main queue
  30. main_queue_consumer = self._consume_messages(message_queue_name)
  31. # Push messages to the mirror queue
  32. for message in main_queue_consumer:
  33. self._push_message_to_mirror(message)
  34. main_queue_consumer.close() # Close the consumer
  35. logging.info(f"Mirroring messages from {message_queue_name} completed.")
  36. except Exception as e:
  37. logging.error(f"Error mirroring messages from {message_queue_name}: {e}")
  38. def _consume_messages(self, message_queue_name):
  39. """
  40. Consumes messages from the main queue and returns an iterator.
  41. Args:
  42. message_queue_name (str): The name of the queue to consume from.
  43. Yields:
  44. dict: Each message as a dictionary.
  45. """
  46. # Simulate consuming from a main queue (replace with actual consumer logic)
  47. # In a real-world scenario, this would involve connecting to the main queue
  48. # and using a consumer library (e.g., pika for RabbitMQ, kafka-python for Kafka).
  49. messages = [
  50. {"message_id": 1, "data": "Message 1"},
  51. {"message_id": 2, "data": "Message 2"},
  52. {"message_id": 3, "data": "Message 3"}
  53. ]
  54. for message in messages:
  55. yield message
  56. def _push_message_to_mirror(self, message):
  57. """
  58. Pushes a message to the mirror queue.
  59. Args:
  60. message (dict): The message to push.
  61. """
  62. try:
  63. # Serialize the message to JSON
  64. message_json = json.dumps(message)
  65. # Push the message to the mirror queue
  66. self.redis_client.lpush(self.mirror_queue_url, message_json)
  67. logging.debug(f"Pushed message {message['message_id']} to mirror queue.")
  68. except Exception as e:
  69. logging.error(f"Error pushing message to mirror queue: {e}")
  70. if __name__ == '__main__':
  71. # Example Usage
  72. mirror = MessageQueueMirror("main_queue", "mirror_queue") # Replace with your queue URLs
  73. mirror.mirror_messages("my_queue")

Add your comment