1. import redis
  2. import json
  3. import datetime
  4. import os
  5. import logging
  6. import argparse
  7. # Configure logging
  8. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  9. class QueueArchiver:
  10. def __init__(self, redis_host='localhost', redis_port=6379, archive_dir='queue_archives'):
  11. self.redis_host = redis_host
  12. self.redis_port = redis_port
  13. self.archive_dir = archive_dir
  14. self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
  15. # Ensure archive directory exists
  16. if not os.path.exists(self.archive_dir):
  17. os.makedirs(self.archive_dir)
  18. def archive_queue_content(self, queue_name):
  19. """Archives all messages from a given queue."""
  20. archive_file_path = os.path.join(self.archive_dir, f"{queue_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
  21. try:
  22. # Get all messages from the queue
  23. messages = self.redis_client.lrange(queue_name, 0, -1)
  24. if not messages:
  25. logging.info(f"Queue {queue_name} is empty.")
  26. return
  27. # Convert messages to JSON format
  28. json_messages = [json.loads(msg) for msg in messages]
  29. # Write JSON data to file. Using a simple, compatible format.
  30. with open(archive_file_path, 'w') as f:
  31. json.dump(json_messages, f, indent=4) #pretty print for readability
  32. logging.info(f"Successfully archived content of queue {queue_name} to {archive_file_path}")
  33. except redis.exceptions.ConnectionError as e:
  34. logging.error(f"Redis connection error: {e}")
  35. except Exception as e:
  36. logging.error(f"Error archiving queue {queue_name}: {e}")
  37. def archive_queue_content_batch(self, queue_name, batch_size=1000):
  38. """Archives content in batches to avoid memory issues with large queues."""
  39. total_messages = self.redis_client.llen(queue_name)
  40. start = 0
  41. while start < total_messages:
  42. end = min(start + batch_size, total_messages)
  43. messages = self.redis_client.lrange(queue_name, start, end - 1) #lrange is exclusive of end index
  44. if not messages:
  45. break
  46. try:
  47. json_messages = [json.loads(msg) for msg in messages]
  48. archive_file_path = os.path.join(self.archive_dir, f"{queue_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
  49. with open(archive_file_path, 'w') as f:
  50. json.dump(json_messages, f, indent=4)
  51. logging.info(f"Archived batch of messages from {queue_name} to {archive_file_path}")
  52. except redis.exceptions.ConnectionError as e:
  53. logging.error(f"Redis connection error: {e}")
  54. except Exception as e:
  55. logging.error(f"Error archiving batch from {queue_name}: {e}")
  56. start = end
  57. def main(self):
  58. """Main function to parse command line arguments and archive queues."""
  59. parser = argparse.ArgumentParser(description="Archive content of Redis queues.")
  60. parser.add_argument("queue_name", help="The name of the queue to archive.")
  61. parser.add_argument("--batch_size", type=int, default=1000, help="Batch size for archiving large queues.")
  62. args = parser.parse_args()
  63. self.archive_queue_content_batch(args.queue_name, args.batch_size)
  64. if __name__ == "__main__":
  65. archiver = QueueArchiver()
  66. archiver.main()

Add your comment