import redis
import json
import datetime
import os
import logging
import argparse
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class QueueArchiver:
def __init__(self, redis_host='localhost', redis_port=6379, archive_dir='queue_archives'):
self.redis_host = redis_host
self.redis_port = redis_port
self.archive_dir = archive_dir
self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port)
# Ensure archive directory exists
if not os.path.exists(self.archive_dir):
os.makedirs(self.archive_dir)
def archive_queue_content(self, queue_name):
"""Archives all messages from a given queue."""
archive_file_path = os.path.join(self.archive_dir, f"{queue_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
try:
# Get all messages from the queue
messages = self.redis_client.lrange(queue_name, 0, -1)
if not messages:
logging.info(f"Queue {queue_name} is empty.")
return
# Convert messages to JSON format
json_messages = [json.loads(msg) for msg in messages]
# Write JSON data to file. Using a simple, compatible format.
with open(archive_file_path, 'w') as f:
json.dump(json_messages, f, indent=4) #pretty print for readability
logging.info(f"Successfully archived content of queue {queue_name} to {archive_file_path}")
except redis.exceptions.ConnectionError as e:
logging.error(f"Redis connection error: {e}")
except Exception as e:
logging.error(f"Error archiving queue {queue_name}: {e}")
def archive_queue_content_batch(self, queue_name, batch_size=1000):
"""Archives content in batches to avoid memory issues with large queues."""
total_messages = self.redis_client.llen(queue_name)
start = 0
while start < total_messages:
end = min(start + batch_size, total_messages)
messages = self.redis_client.lrange(queue_name, start, end - 1) #lrange is exclusive of end index
if not messages:
break
try:
json_messages = [json.loads(msg) for msg in messages]
archive_file_path = os.path.join(self.archive_dir, f"{queue_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(archive_file_path, 'w') as f:
json.dump(json_messages, f, indent=4)
logging.info(f"Archived batch of messages from {queue_name} to {archive_file_path}")
except redis.exceptions.ConnectionError as e:
logging.error(f"Redis connection error: {e}")
except Exception as e:
logging.error(f"Error archiving batch from {queue_name}: {e}")
start = end
def main(self):
"""Main function to parse command line arguments and archive queues."""
parser = argparse.ArgumentParser(description="Archive content of Redis queues.")
parser.add_argument("queue_name", help="The name of the queue to archive.")
parser.add_argument("--batch_size", type=int, default=1000, help="Batch size for archiving large queues.")
args = parser.parse_args()
self.archive_queue_content_batch(args.queue_name, args.batch_size)
if __name__ == "__main__":
archiver = QueueArchiver()
archiver.main()
Add your comment