import redis
import time
import json
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class MessageStreamer:
def __init__(self, redis_host='localhost', redis_port=6379, queue_name='my_queue', retry_interval=5):
self.redis = redis.Redis(host=redis_host, port=redis_port)
self.queue_name = queue_name
self.retry_interval = retry_interval
self.running = True
def stream_messages(self):
"""Streams messages from the queue with retry mechanism."""
while self.running:
try:
# Blocking pop with a timeout
message = self.redis.blpop(self.queue_name, timeout=1)
if message:
_, data = message # Unpack the tuple
try:
message_data = json.loads(data.decode('utf-8')) # Decode bytes to string, then parse JSON
logging.info(f"Received message: {message_data}")
# Process the message (replace with your logic)
print(message_data)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON: {e}")
logging.error(f"Raw data: {data.decode('utf-8')}")
else:
# No message available, sleep for the retry interval
time.sleep(self.retry_interval)
except redis.exceptions.ConnectionError as e:
logging.error(f"Redis connection error: {e}")
time.sleep(self.retry_interval)
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
time.sleep(self.retry_interval)
def stop(self):
"""Stops the message streamer."""
self.running = False
logging.info("Message streamer stopped.")
if __name__ == '__main__':
streamer = MessageStreamer()
try:
streamer.stream_messages()
except KeyboardInterrupt:
streamer.stop()
Add your comment