1. import redis
  2. import time
  3. import logging
  4. import traceback
  5. import queue
  6. # Configure logging
  7. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  8. class QueueMirror:
  9. def __init__(self, source_queue_url, destination_queue_url, redis_host='localhost', redis_port=6379):
  10. self.source_queue_url = source_queue_url
  11. self.destination_queue_url = destination_queue_url
  12. self.redis_host = redis_host
  13. self.redis_port = redis_port
  14. self.source_queue = queue.Queue()
  15. self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
  16. def start_mirroring(self):
  17. try:
  18. # Start consuming from the source queue
  19. self.consume_source()
  20. except Exception as e:
  21. logging.error(f"Error starting mirroring: {e}")
  22. traceback.print_exc() # Print the stack trace for debugging
  23. def consume_source(self):
  24. while True:
  25. try:
  26. message = self.source_queue.get(timeout=1) # Wait up to 1 second for a message
  27. if message is None: # Sentinel value to signal shutdown
  28. break
  29. self.mirror_message(message)
  30. self.source_queue.task_done() # Indicate that a formerly enqueued task is complete
  31. except queue.Empty:
  32. # Queue is empty, continue to the next iteration
  33. pass
  34. except Exception as e:
  35. logging.error(f"Error consuming message: {e}")
  36. traceback.print_exc() # Print stack trace
  37. def mirror_message(self, message):
  38. try:
  39. self.redis_client.rpush(self.destination_queue_url, message)
  40. logging.info(f"Mirrored message: {message}")
  41. except redis.exceptions.ConnectionError as e:
  42. logging.error(f"Redis connection error: {e}")
  43. # Handle connection error, e.g., retry or log and continue
  44. except Exception as e:
  45. logging.error(f"Error mirroring message: {e}")
  46. traceback.print_exc() #Print stack trace
  47. def enqueue_message(self, message):
  48. self.source_queue.put(message)
  49. def stop(self):
  50. # Signal the consumer to stop
  51. self.enqueue_message(None)
  52. self.source_queue.join() # Wait for all tasks to be processed
  53. self.redis_client.close()
  54. logging.info("Mirroring stopped.")
  55. if __name__ == '__main__':
  56. # Example usage (replace with your actual queue URLs)
  57. source_queue_url = 'my_source_queue' # e.g., 'sqs://...' or 'kafka://...'
  58. destination_queue_url = 'my_destination_queue' #e.g., 'redis://...'
  59. mirror = QueueMirror(source_queue_url, destination_queue_url)
  60. mirror.start_mirroring()
  61. try:
  62. # Simulate message production
  63. for i in range(10):
  64. message = f"Message {i}"
  65. mirror.enqueue_message(message)
  66. time.sleep(0.5)
  67. except KeyboardInterrupt:
  68. logging.info("Stopping mirroring...")
  69. finally:
  70. mirror.stop()

Add your comment