import redis
import time
import logging
import traceback
import queue
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class QueueMirror:
def __init__(self, source_queue_url, destination_queue_url, redis_host='localhost', redis_port=6379):
self.source_queue_url = source_queue_url
self.destination_queue_url = destination_queue_url
self.redis_host = redis_host
self.redis_port = redis_port
self.source_queue = queue.Queue()
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
def start_mirroring(self):
try:
# Start consuming from the source queue
self.consume_source()
except Exception as e:
logging.error(f"Error starting mirroring: {e}")
traceback.print_exc() # Print the stack trace for debugging
def consume_source(self):
while True:
try:
message = self.source_queue.get(timeout=1) # Wait up to 1 second for a message
if message is None: # Sentinel value to signal shutdown
break
self.mirror_message(message)
self.source_queue.task_done() # Indicate that a formerly enqueued task is complete
except queue.Empty:
# Queue is empty, continue to the next iteration
pass
except Exception as e:
logging.error(f"Error consuming message: {e}")
traceback.print_exc() # Print stack trace
def mirror_message(self, message):
try:
self.redis_client.rpush(self.destination_queue_url, message)
logging.info(f"Mirrored message: {message}")
except redis.exceptions.ConnectionError as e:
logging.error(f"Redis connection error: {e}")
# Handle connection error, e.g., retry or log and continue
except Exception as e:
logging.error(f"Error mirroring message: {e}")
traceback.print_exc() #Print stack trace
def enqueue_message(self, message):
self.source_queue.put(message)
def stop(self):
# Signal the consumer to stop
self.enqueue_message(None)
self.source_queue.join() # Wait for all tasks to be processed
self.redis_client.close()
logging.info("Mirroring stopped.")
if __name__ == '__main__':
# Example usage (replace with your actual queue URLs)
source_queue_url = 'my_source_queue' # e.g., 'sqs://...' or 'kafka://...'
destination_queue_url = 'my_destination_queue' #e.g., 'redis://...'
mirror = QueueMirror(source_queue_url, destination_queue_url)
mirror.start_mirroring()
try:
# Simulate message production
for i in range(10):
message = f"Message {i}"
mirror.enqueue_message(message)
time.sleep(0.5)
except KeyboardInterrupt:
logging.info("Stopping mirroring...")
finally:
mirror.stop()
Add your comment