1. import redis
  2. import json
  3. import argparse
  4. def parse_message_queue(host='localhost', port=6379, db=0, pattern='*'):
  5. """
  6. Parses data from a Redis message queue.
  7. Args:
  8. host (str): Redis server host.
  9. port (int): Redis server port.
  10. db (int): Redis database number.
  11. pattern (str): Pattern to match queue names (e.g., '*').
  12. Returns:
  13. list: A list of message data (dictionaries).
  14. """
  15. try:
  16. r = redis.Redis(host=host, port=port, db=db)
  17. # Get all keys matching the pattern
  18. keys = r.keys(pattern)
  19. messages = []
  20. for key in keys:
  21. try:
  22. # Decode the message as JSON
  23. message = json.loads(r.get(key).decode('utf-8'))
  24. messages.append(message)
  25. except json.JSONDecodeError:
  26. print(f"Warning: Could not decode message from key {key}. Skipping.")
  27. except redis.exceptions.ConnectionError as e:
  28. print(f"Error connecting to Redis: {e}")
  29. return []
  30. return messages
  31. except redis.exceptions.ConnectionError as e:
  32. print(f"Error connecting to Redis: {e}")
  33. return []
  34. except Exception as e:
  35. print(f"An unexpected error occurred: {e}")
  36. return []
  37. if __name__ == '__main__':
  38. parser = argparse.ArgumentParser(description='Parse data from a Redis message queue.')
  39. parser.add_argument('--host', default='localhost', help='Redis server host.')
  40. parser.add_argument('--port', type=int, default=6379, help='Redis server port.')
  41. parser.add_argument('--db', type=int, default=0, help='Redis database number.')
  42. parser.add_argument('--pattern', default='*', help='Queue name pattern.')
  43. args = parser.parse_args()
  44. messages = parse_message_queue(args.host, args.port, args.db, args.pattern)
  45. if messages:
  46. print(f"Found {len(messages)} messages.")
  47. for i, message in enumerate(messages):
  48. print(f"Message {i+1}: {message}")

Add your comment