1. import urllib.request
  2. import urllib.parse
  3. import ssl
  4. import socket
  5. import threading
  6. import queue
  7. class CompatHTTPClient:
  8. def __init__(self, max_memory=1024*1024): # 1MB default
  9. self.max_memory = max_memory
  10. self.request_queue = queue.Queue()
  11. self.response_queue = queue.Queue()
  12. self.running = True
  13. self.request_threads = []
  14. self.response_threads = []
  15. def request(self, url, method='GET', headers=None, data=None, timeout=None):
  16. """
  17. Sends an HTTP request and returns the response.
  18. Handles memory constraints by streaming the response.
  19. """
  20. req = urllib.request.Request(url, method=method, headers=headers)
  21. try:
  22. with urllib.request.urlopen(req, timeout=timeout) as response:
  23. # Stream the response
  24. for chunk in iter(lambda: response.read(4096), b''):
  25. yield chunk
  26. except Exception as e:
  27. yield f"Error: {e}"
  28. def submit_request(self, url, method='GET', headers=None, data=None, timeout=None):
  29. """
  30. Submits a request to the request queue.
  31. """
  32. self.request_queue.put((url, method, headers, data, timeout))
  33. def _worker_request(self):
  34. """
  35. Worker thread to handle requests from the queue.
  36. """
  37. while self.running:
  38. try:
  39. url, method, headers, data, timeout = self.request_queue.get(timeout=0.1)
  40. try:
  41. response = self.request(url, method, headers, data, timeout)
  42. self.response_queue.put((url, response))
  43. except Exception as e:
  44. self.response_queue.put((url, f"Error: {e}"))
  45. finally:
  46. self.request_queue.task_done()
  47. except queue.Empty:
  48. pass # Continue if queue is empty
  49. def _worker_response(self):
  50. """
  51. Worker thread to handle responses from the queue.
  52. """
  53. while self.running:
  54. try:
  55. url, response = self.response_queue.get(timeout=0.1)
  56. yield url, response
  57. except queue.Empty:
  58. pass # Continue if queue is empty
  59. def start(self, num_request_threads=4, num_response_threads=4):
  60. """
  61. Starts the worker threads.
  62. """
  63. for _ in range(num_request_threads):
  64. thread = threading.Thread(target=self._worker_request)
  65. thread.daemon = True
  66. self.request_threads.append(thread)
  67. thread.start()
  68. for _ in range(num_response_threads):
  69. thread = threading.Thread(target=self._worker_response)
  70. thread.daemon = True
  71. self.response_threads.append(thread)
  72. thread.start()
  73. def stop(self):
  74. """
  75. Stops the worker threads.
  76. """
  77. self.running = False
  78. for thread in self.request_threads:
  79. thread.join(timeout=0.1)
  80. for thread in self.response_threads:
  81. thread.join(timeout=0.1)

Add your comment