import queue
import threading
import time
class MessageQueue:
def __init__(self, max_size=100):
"""
Initializes a message queue with a maximum size.
"""
self.queue = queue.Queue(maxsize=max_size) # Use queue.Queue for thread safety
self.lock = threading.Lock() # For additional thread safety if needed
def put(self, item, block=True, timeout=None):
"""
Adds an item to the queue. Blocks if the queue is full.
"""
self.queue.put(item, block=block, timeout=timeout) # Use queue.put for blocking behavior
def get(self, block=True, timeout=None):
"""
Removes and returns an item from the queue. Blocks if the queue is empty.
"""
return self.queue.get(block=block, timeout=timeout) # Use queue.get for blocking behavior
def size(self):
"""
Returns the current number of items in the queue.
"""
with self.lock: #Protect queue size from race conditions if needed
return self.queue.qsize()
def is_empty(self):
"""
Returns True if the queue is empty, False otherwise.
"""
return self.queue.empty()
def is_full(self):
"""
Returns True if the queue is full, False otherwise.
"""
return self.queue.full()
def enforce_queue_limit(queue_instance, item):
"""
Example function to demonstrate queue limit enforcement.
"""
if queue_instance.size() >= 95: #limit is 100, allow 5% buffer
print(f"Queue is almost full. Dropping item: {item}")
return False # Indicate item was dropped
else:
queue_instance.put(item) # Add the item to the queue
print(f"Added item to queue: {item}. Queue size: {queue_instance.size()}")
return True # Indicate item was added
if __name__ == '__main__':
my_queue = MessageQueue(max_size=100) # Create a queue with a maximum size of 100
# Simulate adding items to the queue
for i in range(120):
enforce_queue_limit(my_queue, i)
time.sleep(0.01) # Simulate some processing time
print("Queue is now full.")
#Example of trying to add more items after queue is full
for i in range(120,130):
enforce_queue_limit(my_queue, i)
time.sleep(0.01)
Add your comment