import logging
import threading
import queue
import time
import random
class LogStreamSynchronizer:
def __init__(self, log_streams, validation_checks, sync_interval=1):
self.log_streams = log_streams
self.validation_checks = validation_checks
self.sync_interval = sync_interval
self.queue = queue.Queue()
self.stop_event = threading.Event()
self.threads = []
def _validate_log_stream(self, log_stream):
try:
for check in self.validation_checks:
result = check(log_stream)
if not result:
print(f"Validation check failed for {log_stream}")
return False
return True
except Exception as e:
print(f"Error during validation for {log_stream}: {e}")
return False
def _sync_log_stream(self, log_stream):
try:
# Simulate fetching data from log stream
data = log_stream.get_data()
if data is None:
print(f"Log stream {log_stream} is empty.")
return False
if self._validate_log_stream(data):
print(f"Log stream {log_stream} validated successfully.")
return True
else:
print(f"Log stream {log_stream} validation failed.")
return False
except Exception as e:
print(f"Error syncing log stream {log_stream}: {e}")
return False
def _worker(self):
while not self.stop_event.is_set():
try:
log_stream = self.queue.get(timeout=self.sync_interval)
self._sync_log_stream(log_stream)
self.queue.task_done()
except queue.Empty:
pass
time.sleep(self.sync_interval)
def start(self):
for stream in self.log_streams:
self.queue.put(stream)
num_threads = len(self.log_streams) # Or a reasonable number
for _ in range(num_threads):
thread = threading.Thread(target=self._worker)
thread.daemon = True # Allow main thread to exit even if workers are running
self.threads.append(thread)
thread.start()
def stop(self):
self.stop_event.set()
for thread in self.threads:
thread.join()
if __name__ == '__main__':
class MockLogStream:
def __init__(self, name, data=None):
self.name = name
self.data = data
def get_data(self):
return self.data
def check_data_not_empty(data):
return data is not None and len(data) > 0
def check_data_contains_keyword(data):
return "important" in data
log_streams = [
MockLogStream("stream_1", "This is some log data."),
MockLogStream("stream_2", None),
MockLogStream("stream_3", "Another log entry.")
]
validation_checks = [check_data_not_empty, check_data_contains_keyword]
sync_system = LogStreamSynchronizer(log_streams, validation_checks, sync_interval=2)
sync_system.start()
time.sleep(10)
sync_system.stop()
print("Synchronization stopped.")
Add your comment