1. import functools
  2. import inspect
  3. import importlib
  4. import sys
  5. import types
  6. def instrument_queue_code(queue_name, module_path):
  7. """
  8. Instruments message queue code for backward compatibility without async logic.
  9. Args:
  10. queue_name (str): The name of the queue to instrument (e.g., "my_queue").
  11. module_path (str): The path to the module containing the queue implementation.
  12. """
  13. try:
  14. module = importlib.import_module(module_path)
  15. except ImportError as e:
  16. print(f"Error importing module {module_path}: {e}")
  17. return
  18. queue_class = None
  19. for name, obj in inspect.getmembers(module):
  20. if name == queue_name and inspect.isclass(obj):
  21. queue_class = obj
  22. break
  23. if queue_class is None:
  24. print(f"Queue class '{queue_name}' not found in module {module_path}")
  25. return
  26. original_put = None
  27. original_get = None
  28. original_size = None
  29. # Store original methods for later restoration
  30. try:
  31. original_put = queue_class.put
  32. except AttributeError:
  33. pass
  34. try:
  35. original_get = queue_class.get
  36. except AttributeError:
  37. pass
  38. try:
  39. original_size = queue_class.size
  40. except AttributeError:
  41. pass
  42. def wrapped_put(item):
  43. """
  44. Wraps the put method to add instrumentation.
  45. """
  46. print(f"Putting item: {item} into queue: {queue_name}")
  47. if original_put:
  48. original_put(item)
  49. return
  50. def wrapped_get():
  51. """
  52. Wraps the get method to add instrumentation.
  53. """
  54. print(f"Getting item from queue: {queue_name}")
  55. if original_get:
  56. return original_get()
  57. return None
  58. def wrapped_size():
  59. """
  60. Wraps the size method to add instrumentation.
  61. """
  62. print(f"Queue {queue_name} size: {len(queue_class._queue)}")
  63. if original_size:
  64. return original_size()
  65. return len(queue_class._queue) #Default return
  66. # Replace the original methods with the wrapped versions
  67. queue_class.put = wrapped_put
  68. queue_class.get = wrapped_get
  69. queue_class.size = wrapped_size
  70. print(f"Instrumented queue '{queue_name}' in module '{module_path}'")
  71. return queue_class #Return the modified class for potential use
  72. if __name__ == '__main__':
  73. # Example usage (replace with your actual queue implementation)
  74. # Create a dummy queue class for testing
  75. class DummyQueue:
  76. def __init__(self):
  77. self._queue = []
  78. def put(self, item):
  79. self._queue.append(item)
  80. def get(self):
  81. if self._queue:
  82. return self._queue.pop(0)
  83. else:
  84. return None
  85. def size(self):
  86. return len(self._queue)
  87. # Save the dummy queue class to a file
  88. with open("dummy_queue.py", "w") as f:
  89. f.write("""
  90. class DummyQueue:
  91. def __init__(self):
  92. self._queue = []
  93. def put(self, item):
  94. self._queue.append(item)
  95. def get(self):
  96. if self._queue:
  97. return self._queue.pop(0)
  98. else:
  99. return None
  100. def size(self):
  101. return len(self._queue)
  102. """)
  103. #Instrument the dummy queue
  104. instrument_queue_code("DummyQueue", "dummy_queue")
  105. #Test the instrumented queue
  106. import dummy_queue
  107. q = dummy_queue.DummyQueue()
  108. q.put("item1")
  109. q.put("item2")
  110. print(q.get())
  111. print(q.size())

Add your comment