1. import time
  2. import threading
  3. def data_stream(data_source, queue):
  4. """Reads data from a source and puts it into a queue."""
  5. try:
  6. while True:
  7. data = data_source.read_line() # Read a line from the data source
  8. if data:
  9. queue.put(data.strip()) # Put the data (stripped of whitespace) into the queue
  10. time.sleep(0.1) #Small delay to avoid busy-waiting
  11. except Exception as e:
  12. print(f"Error reading data: {e}")
  13. finally:
  14. queue.put(None) #Signal end of stream
  15. def process_data_stream(queue):
  16. """Processes data from the queue."""
  17. while True:
  18. data = queue.get() # Get data from the queue
  19. if data is None:
  20. break # Exit loop if None is received (end of stream)
  21. print(f"Processing: {data}")
  22. time.sleep(0.5) # Simulate processing time
  23. if __name__ == "__main__":
  24. # Simulate a data source (e.g., a file or network connection)
  25. class MockDataSource:
  26. def __init__(self, data):
  27. self.data = data
  28. self.index = 0
  29. def read_line(self):
  30. if self.index < len(self.data):
  31. line = self.data[self.index]
  32. self.index += 1
  33. return line
  34. else:
  35. return None # Indicate end of data
  36. data = ["Task A: Check pump", "Task B: Inspect valves", "Task C: Lubricate gears", "Task D: Check sensors"]
  37. data_source = MockDataSource(data)
  38. data_queue = threading.Queue()
  39. # Start data reader thread
  40. reader_thread = threading.Thread(target=data_stream, args=(data_source, data_queue))
  41. reader_thread.daemon = True #Allow main thread to exit even if this is running.
  42. reader_thread.start()
  43. # Start data processor thread
  44. processor_thread = threading.Thread(target=process_data_stream, args=(data_queue,))
  45. processor_thread.daemon = True
  46. processor_thread.start()
  47. # Keep the main thread alive for a while to allow processing
  48. time.sleep(5)
  49. print("Exiting.")

Add your comment