1. import logging
  2. import time
  3. import traceback
  4. import queue
  5. # Configure logging
  6. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  7. class MessageQueueNormalizer:
  8. def __init__(self, queue_url, normalization_function, batch_size=100):
  9. self.queue_url = queue_url
  10. self.normalization_function = normalization_function # Function to normalize data
  11. self.queue = queue.Queue()
  12. self.running = True
  13. def enqueue_messages(self, messages):
  14. """Enqueues messages into the queue."""
  15. for message in messages:
  16. self.queue.put(message)
  17. def normalize_messages(self):
  18. """Normalizes messages from the queue."""
  19. while self.running:
  20. try:
  21. messages = self.queue.get(timeout=5) # Wait up to 5 seconds for messages
  22. if messages is None:
  23. # Signal to stop processing
  24. break
  25. normalized_messages = self.normalization_function(messages)
  26. # Process normalized messages (e.g., write to database)
  27. # print(f"Normalized messages: {normalized_messages}") #for debugging
  28. self.queue.task_done() # Mark task as complete
  29. except queue.Empty:
  30. # Queue is empty, continue to check for more messages
  31. pass
  32. except Exception as e:
  33. logging.error(f"Error normalizing messages: {e}")
  34. traceback.print_exc() # Print full traceback
  35. time.sleep(10) # Wait before retrying
  36. def start(self):
  37. """Starts the normalization process."""
  38. self.running = True
  39. logging.info("Normalization process started.")
  40. self.normalize_messages()
  41. def stop(self):
  42. """Stops the normalization process gracefully."""
  43. logging.info("Stopping normalization process...")
  44. self.running = False
  45. # Signal the worker to stop by putting None in the queue
  46. for _ in range(10): #put None multiple times to make sure the worker receives it
  47. self.queue.put(None)
  48. self.queue.join() #wait for all tasks to be completed
  49. logging.info("Normalization process stopped.")
  50. if __name__ == '__main__':
  51. # Example Usage
  52. def example_normalization(messages):
  53. """Example normalization function."""
  54. normalized = []
  55. for msg in messages:
  56. normalized.append(msg.upper()) # Convert to uppercase
  57. return normalized
  58. # Create a queue URL (replace with your actual queue URL)
  59. queue_url = "your_queue_url"
  60. # Create a MessageQueueNormalizer instance
  61. normalizer = MessageQueueNormalizer(queue_url, example_normalization)
  62. # Sample messages
  63. sample_messages = ["hello", "world", "python"]
  64. # Enqueue messages
  65. normalizer.enqueue_messages(sample_messages)
  66. # Start normalization
  67. normalizer.start()
  68. # Let it run for a while
  69. time.sleep(20)
  70. # Stop normalization gracefully
  71. normalizer.stop()

Add your comment