import time
import threading
def data_stream(data_source, queue):
"""Reads data from a source and puts it into a queue."""
try:
while True:
data = data_source.read_line() # Read a line from the data source
if data:
queue.put(data.strip()) # Put the data (stripped of whitespace) into the queue
time.sleep(0.1) #Small delay to avoid busy-waiting
except Exception as e:
print(f"Error reading data: {e}")
finally:
queue.put(None) #Signal end of stream
def process_data_stream(queue):
"""Processes data from the queue."""
while True:
data = queue.get() # Get data from the queue
if data is None:
break # Exit loop if None is received (end of stream)
print(f"Processing: {data}")
time.sleep(0.5) # Simulate processing time
if __name__ == "__main__":
# Simulate a data source (e.g., a file or network connection)
class MockDataSource:
def __init__(self, data):
self.data = data
self.index = 0
def read_line(self):
if self.index < len(self.data):
line = self.data[self.index]
self.index += 1
return line
else:
return None # Indicate end of data
data = ["Task A: Check pump", "Task B: Inspect valves", "Task C: Lubricate gears", "Task D: Check sensors"]
data_source = MockDataSource(data)
data_queue = threading.Queue()
# Start data reader thread
reader_thread = threading.Thread(target=data_stream, args=(data_source, data_queue))
reader_thread.daemon = True #Allow main thread to exit even if this is running.
reader_thread.start()
# Start data processor thread
processor_thread = threading.Thread(target=process_data_stream, args=(data_queue,))
processor_thread.daemon = True
processor_thread.start()
# Keep the main thread alive for a while to allow processing
time.sleep(5)
print("Exiting.")
Add your comment