import logging
import time
import traceback
import queue
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class MessageQueueNormalizer:
def __init__(self, queue_url, normalization_function, batch_size=100):
self.queue_url = queue_url
self.normalization_function = normalization_function # Function to normalize data
self.queue = queue.Queue()
self.running = True
def enqueue_messages(self, messages):
"""Enqueues messages into the queue."""
for message in messages:
self.queue.put(message)
def normalize_messages(self):
"""Normalizes messages from the queue."""
while self.running:
try:
messages = self.queue.get(timeout=5) # Wait up to 5 seconds for messages
if messages is None:
# Signal to stop processing
break
normalized_messages = self.normalization_function(messages)
# Process normalized messages (e.g., write to database)
# print(f"Normalized messages: {normalized_messages}") #for debugging
self.queue.task_done() # Mark task as complete
except queue.Empty:
# Queue is empty, continue to check for more messages
pass
except Exception as e:
logging.error(f"Error normalizing messages: {e}")
traceback.print_exc() # Print full traceback
time.sleep(10) # Wait before retrying
def start(self):
"""Starts the normalization process."""
self.running = True
logging.info("Normalization process started.")
self.normalize_messages()
def stop(self):
"""Stops the normalization process gracefully."""
logging.info("Stopping normalization process...")
self.running = False
# Signal the worker to stop by putting None in the queue
for _ in range(10): #put None multiple times to make sure the worker receives it
self.queue.put(None)
self.queue.join() #wait for all tasks to be completed
logging.info("Normalization process stopped.")
if __name__ == '__main__':
# Example Usage
def example_normalization(messages):
"""Example normalization function."""
normalized = []
for msg in messages:
normalized.append(msg.upper()) # Convert to uppercase
return normalized
# Create a queue URL (replace with your actual queue URL)
queue_url = "your_queue_url"
# Create a MessageQueueNormalizer instance
normalizer = MessageQueueNormalizer(queue_url, example_normalization)
# Sample messages
sample_messages = ["hello", "world", "python"]
# Enqueue messages
normalizer.enqueue_messages(sample_messages)
# Start normalization
normalizer.start()
# Let it run for a while
time.sleep(20)
# Stop normalization gracefully
normalizer.stop()
Add your comment