1. import logging
  2. import threading
  3. import queue
  4. import time
  5. import random
  6. class LogStreamSynchronizer:
  7. def __init__(self, log_streams, validation_checks, sync_interval=1):
  8. self.log_streams = log_streams
  9. self.validation_checks = validation_checks
  10. self.sync_interval = sync_interval
  11. self.queue = queue.Queue()
  12. self.stop_event = threading.Event()
  13. self.threads = []
  14. def _validate_log_stream(self, log_stream):
  15. try:
  16. for check in self.validation_checks:
  17. result = check(log_stream)
  18. if not result:
  19. print(f"Validation check failed for {log_stream}")
  20. return False
  21. return True
  22. except Exception as e:
  23. print(f"Error during validation for {log_stream}: {e}")
  24. return False
  25. def _sync_log_stream(self, log_stream):
  26. try:
  27. # Simulate fetching data from log stream
  28. data = log_stream.get_data()
  29. if data is None:
  30. print(f"Log stream {log_stream} is empty.")
  31. return False
  32. if self._validate_log_stream(data):
  33. print(f"Log stream {log_stream} validated successfully.")
  34. return True
  35. else:
  36. print(f"Log stream {log_stream} validation failed.")
  37. return False
  38. except Exception as e:
  39. print(f"Error syncing log stream {log_stream}: {e}")
  40. return False
  41. def _worker(self):
  42. while not self.stop_event.is_set():
  43. try:
  44. log_stream = self.queue.get(timeout=self.sync_interval)
  45. self._sync_log_stream(log_stream)
  46. self.queue.task_done()
  47. except queue.Empty:
  48. pass
  49. time.sleep(self.sync_interval)
  50. def start(self):
  51. for stream in self.log_streams:
  52. self.queue.put(stream)
  53. num_threads = len(self.log_streams) # Or a reasonable number
  54. for _ in range(num_threads):
  55. thread = threading.Thread(target=self._worker)
  56. thread.daemon = True # Allow main thread to exit even if workers are running
  57. self.threads.append(thread)
  58. thread.start()
  59. def stop(self):
  60. self.stop_event.set()
  61. for thread in self.threads:
  62. thread.join()
  63. if __name__ == '__main__':
  64. class MockLogStream:
  65. def __init__(self, name, data=None):
  66. self.name = name
  67. self.data = data
  68. def get_data(self):
  69. return self.data
  70. def check_data_not_empty(data):
  71. return data is not None and len(data) > 0
  72. def check_data_contains_keyword(data):
  73. return "important" in data
  74. log_streams = [
  75. MockLogStream("stream_1", "This is some log data."),
  76. MockLogStream("stream_2", None),
  77. MockLogStream("stream_3", "Another log entry.")
  78. ]
  79. validation_checks = [check_data_not_empty, check_data_contains_keyword]
  80. sync_system = LogStreamSynchronizer(log_streams, validation_checks, sync_interval=2)
  81. sync_system.start()
  82. time.sleep(10)
  83. sync_system.stop()
  84. print("Synchronization stopped.")

Add your comment