1. import queue
  2. import threading
  3. import time
  4. class MessageQueue:
  5. def __init__(self, max_size=100):
  6. """
  7. Initializes a message queue with a maximum size.
  8. """
  9. self.queue = queue.Queue(maxsize=max_size) # Use queue.Queue for thread safety
  10. self.lock = threading.Lock() # For additional thread safety if needed
  11. def put(self, item, block=True, timeout=None):
  12. """
  13. Adds an item to the queue. Blocks if the queue is full.
  14. """
  15. self.queue.put(item, block=block, timeout=timeout) # Use queue.put for blocking behavior
  16. def get(self, block=True, timeout=None):
  17. """
  18. Removes and returns an item from the queue. Blocks if the queue is empty.
  19. """
  20. return self.queue.get(block=block, timeout=timeout) # Use queue.get for blocking behavior
  21. def size(self):
  22. """
  23. Returns the current number of items in the queue.
  24. """
  25. with self.lock: #Protect queue size from race conditions if needed
  26. return self.queue.qsize()
  27. def is_empty(self):
  28. """
  29. Returns True if the queue is empty, False otherwise.
  30. """
  31. return self.queue.empty()
  32. def is_full(self):
  33. """
  34. Returns True if the queue is full, False otherwise.
  35. """
  36. return self.queue.full()
  37. def enforce_queue_limit(queue_instance, item):
  38. """
  39. Example function to demonstrate queue limit enforcement.
  40. """
  41. if queue_instance.size() >= 95: #limit is 100, allow 5% buffer
  42. print(f"Queue is almost full. Dropping item: {item}")
  43. return False # Indicate item was dropped
  44. else:
  45. queue_instance.put(item) # Add the item to the queue
  46. print(f"Added item to queue: {item}. Queue size: {queue_instance.size()}")
  47. return True # Indicate item was added
  48. if __name__ == '__main__':
  49. my_queue = MessageQueue(max_size=100) # Create a queue with a maximum size of 100
  50. # Simulate adding items to the queue
  51. for i in range(120):
  52. enforce_queue_limit(my_queue, i)
  53. time.sleep(0.01) # Simulate some processing time
  54. print("Queue is now full.")
  55. #Example of trying to add more items after queue is full
  56. for i in range(120,130):
  57. enforce_queue_limit(my_queue, i)
  58. time.sleep(0.01)

Add your comment