1. import redis
  2. import time
  3. import json
  4. import logging
  5. # Configure logging
  6. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  7. class MessageStreamer:
  8. def __init__(self, redis_host='localhost', redis_port=6379, queue_name='my_queue', retry_interval=5):
  9. self.redis = redis.Redis(host=redis_host, port=redis_port)
  10. self.queue_name = queue_name
  11. self.retry_interval = retry_interval
  12. self.running = True
  13. def stream_messages(self):
  14. """Streams messages from the queue with retry mechanism."""
  15. while self.running:
  16. try:
  17. # Blocking pop with a timeout
  18. message = self.redis.blpop(self.queue_name, timeout=1)
  19. if message:
  20. _, data = message # Unpack the tuple
  21. try:
  22. message_data = json.loads(data.decode('utf-8')) # Decode bytes to string, then parse JSON
  23. logging.info(f"Received message: {message_data}")
  24. # Process the message (replace with your logic)
  25. print(message_data)
  26. except json.JSONDecodeError as e:
  27. logging.error(f"Error decoding JSON: {e}")
  28. logging.error(f"Raw data: {data.decode('utf-8')}")
  29. else:
  30. # No message available, sleep for the retry interval
  31. time.sleep(self.retry_interval)
  32. except redis.exceptions.ConnectionError as e:
  33. logging.error(f"Redis connection error: {e}")
  34. time.sleep(self.retry_interval)
  35. except Exception as e:
  36. logging.error(f"An unexpected error occurred: {e}")
  37. time.sleep(self.retry_interval)
  38. def stop(self):
  39. """Stops the message streamer."""
  40. self.running = False
  41. logging.info("Message streamer stopped.")
  42. if __name__ == '__main__':
  43. streamer = MessageStreamer()
  44. try:
  45. streamer.stream_messages()
  46. except KeyboardInterrupt:
  47. streamer.stop()

Add your comment