1. /**
  2. * Exports message queue results for manual execution.
  3. * Handles edge cases like empty queues and invalid data.
  4. *
  5. * @param {object} queue - The message queue object (e.g., RabbitMQ, Kafka).
  6. * @param {string} filename - The name of the file to export to.
  7. * @param {function} processMessage - Function to execute on each message.
  8. * @returns {Promise<void>} - A promise that resolves when the export is complete.
  9. */
  10. async function exportQueueResults(queue, filename, processMessage) {
  11. try {
  12. // Check if queue is valid
  13. if (!queue || typeof queue.consume !== 'function') {
  14. console.error("Invalid queue object provided.");
  15. return Promise.reject(new Error("Invalid queue object"));
  16. }
  17. // Check if filename is valid
  18. if (!filename || typeof filename !== 'string' || filename.trim() === '') {
  19. console.error("Invalid filename provided.");
  20. return Promise.reject(new Error("Invalid filename"));
  21. }
  22. const results = [];
  23. let message;
  24. // Consume messages until the queue is empty or an error occurs
  25. while ((message = queue.consume()) !== null) {
  26. try {
  27. // Process the message
  28. const result = processMessage(message); //Execute the process message function
  29. results.push(result);
  30. } catch (error) {
  31. console.error("Error processing message:", error);
  32. //Handle the error. You can log it or take other actions.
  33. }
  34. }
  35. // Close the consumer
  36. queue.close();
  37. // Write results to file
  38. const fs = require('fs'); //CommonJS module for file system operations.
  39. try {
  40. fs.writeFileSync(filename, JSON.stringify(results, null, 2)); //Write to file with pretty printing
  41. console.log(`Results exported to ${filename}`);
  42. } catch (err) {
  43. console.error("Error writing to file:", err);
  44. return Promise.reject(err);
  45. }
  46. } catch (error) {
  47. console.error("An error occurred:", error);
  48. return Promise.reject(error); //Reject the promise on any error.
  49. }
  50. }

Add your comment