import queue
import threading
import time
import random
class SandboxTask:
def __init__(self, task_id, runtime):
self.task_id = task_id
self.runtime = runtime
class SandboxQueue:
def __init__(self, max_tasks=5):
self.task_queue = queue.Queue(maxsize=max_tasks) # Limit the number of tasks
self.lock = threading.Lock()
def add_task(self, task):
with self.lock:
self.task_queue.put(task)
def get_task(self):
try:
return self.task_queue.get(timeout=1) # Wait for a task, with a timeout
except queue.Empty:
return None # Return None if queue is empty
class SandboxEnvironment:
def __init__(self, memory_limit=1024, cpu_limit=2):
self.memory_limit = memory_limit
self.cpu_limit = cpu_limit
def execute_task(self, task):
print(f"Executing task {task.task_id} with runtime {task.runtime}")
# Simulate task execution with random sleep
sleep_time = random.uniform(0.1, task.runtime)
time.sleep(sleep_time)
print(f"Task {task.task_id} completed after {sleep_time:.2f} seconds.")
class TaskExecutor(threading.Thread):
def __init__(self, sandbox, task_queue):
super().__init__()
self.sandbox = sandbox
self.task_queue = task_queue
def run(self):
while True:
task = self.task_queue.get()
if task is None:
break # Exit thread if None is received
self.sandbox.execute_task(task)
self.task_queue.task_done() # Indicate task completion
def main():
# Define sandbox limits
memory_limit = 1024 # MB
cpu_limit = 2
# Create sandbox environment
sandbox = SandboxEnvironment(memory_limit=memory_limit, cpu_limit=cpu_limit)
# Create task queue with a limit
task_queue = SandboxQueue(max_tasks=3)
# Create and start worker thread
executor_thread = TaskExecutor(sandbox, task_queue)
executor_thread.daemon = True # Allow main thread to exit even if worker is running
executor_thread.start()
# Add tasks to the queue
tasks = [
SandboxTask(1, 0.5),
SandboxTask(2, 1.2),
SandboxTask(3, 0.8),
SandboxTask(4, 1.5),
SandboxTask(5, 0.7),
]
for task in tasks:
task_queue.add_task(task)
# Wait for all tasks to complete
task_queue.join()
print("All tasks completed.")
if __name__ == "__main__":
main()
Add your comment