import redis
import json
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class MessageQueueMirror:
def __init__(self, main_queue_url, mirror_queue_url, redis_host='localhost', redis_port=6379):
"""
Initializes the MessageQueueMirror.
Args:
main_queue_url (str): URL of the main message queue (e.g., RabbitMQ, Kafka).
mirror_queue_url (str): URL of the mirror message queue (e.g., Redis).
redis_host (str): Redis host.
redis_port (int): Redis port.
"""
self.main_queue_url = main_queue_url
self.mirror_queue_url = mirror_queue_url
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port, decode_responses=True)
def mirror_messages(self, message_queue_name):
"""
Mirrors messages from a specified queue in the main queue to the mirror queue.
Args:
message_queue_name (str): The name of the queue to mirror.
"""
try:
# Consume messages from the main queue
main_queue_consumer = self._consume_messages(message_queue_name)
# Push messages to the mirror queue
for message in main_queue_consumer:
self._push_message_to_mirror(message)
main_queue_consumer.close() # Close the consumer
logging.info(f"Mirroring messages from {message_queue_name} completed.")
except Exception as e:
logging.error(f"Error mirroring messages from {message_queue_name}: {e}")
def _consume_messages(self, message_queue_name):
"""
Consumes messages from the main queue and returns an iterator.
Args:
message_queue_name (str): The name of the queue to consume from.
Yields:
dict: Each message as a dictionary.
"""
# Simulate consuming from a main queue (replace with actual consumer logic)
# In a real-world scenario, this would involve connecting to the main queue
# and using a consumer library (e.g., pika for RabbitMQ, kafka-python for Kafka).
messages = [
{"message_id": 1, "data": "Message 1"},
{"message_id": 2, "data": "Message 2"},
{"message_id": 3, "data": "Message 3"}
]
for message in messages:
yield message
def _push_message_to_mirror(self, message):
"""
Pushes a message to the mirror queue.
Args:
message (dict): The message to push.
"""
try:
# Serialize the message to JSON
message_json = json.dumps(message)
# Push the message to the mirror queue
self.redis_client.lpush(self.mirror_queue_url, message_json)
logging.debug(f"Pushed message {message['message_id']} to mirror queue.")
except Exception as e:
logging.error(f"Error pushing message to mirror queue: {e}")
if __name__ == '__main__':
# Example Usage
mirror = MessageQueueMirror("main_queue", "mirror_queue") # Replace with your queue URLs
mirror.mirror_messages("my_queue")
Add your comment