/**
* Exports message queue results for manual execution.
* Handles edge cases like empty queues and invalid data.
*
* @param {object} queue - The message queue object (e.g., RabbitMQ, Kafka).
* @param {string} filename - The name of the file to export to.
* @param {function} processMessage - Function to execute on each message.
* @returns {Promise<void>} - A promise that resolves when the export is complete.
*/
async function exportQueueResults(queue, filename, processMessage) {
try {
// Check if queue is valid
if (!queue || typeof queue.consume !== 'function') {
console.error("Invalid queue object provided.");
return Promise.reject(new Error("Invalid queue object"));
}
// Check if filename is valid
if (!filename || typeof filename !== 'string' || filename.trim() === '') {
console.error("Invalid filename provided.");
return Promise.reject(new Error("Invalid filename"));
}
const results = [];
let message;
// Consume messages until the queue is empty or an error occurs
while ((message = queue.consume()) !== null) {
try {
// Process the message
const result = processMessage(message); //Execute the process message function
results.push(result);
} catch (error) {
console.error("Error processing message:", error);
//Handle the error. You can log it or take other actions.
}
}
// Close the consumer
queue.close();
// Write results to file
const fs = require('fs'); //CommonJS module for file system operations.
try {
fs.writeFileSync(filename, JSON.stringify(results, null, 2)); //Write to file with pretty printing
console.log(`Results exported to ${filename}`);
} catch (err) {
console.error("Error writing to file:", err);
return Promise.reject(err);
}
} catch (error) {
console.error("An error occurred:", error);
return Promise.reject(error); //Reject the promise on any error.
}
}
Add your comment