const { pool } = require('../db'); const { N8N_WHATSAPP_TRIGGER_URL } = require('../config'); const { buildWhatsappCampaignPayload, formatProductList, groupCampaignRows, groupCampaignRowsByBaseProduct, mapCampaignProducts } = require('./campaignFormatter'); const TOP_BUYERS_LIMIT = 100; const MAX_CAMPAIGN_ATTEMPTS = 3; const CAMPAIGN_DELTA_THRESHOLD = 100; const enqueueStockCampaignItem = async (client, item) => { const query = ` INSERT INTO stock_campaign_queue ( base_product_name, produto_id, nome, saldo, delta_estoque ) VALUES ($1, $2, $3, $4, $5) `; await client.query(query, [ item.baseProductName, item.produtoId, item.nome, item.saldo, item.deltaEstoque ]); }; const getTopBuyersAllTime = async () => { const result = await pool.query(` SELECT MAX(cliente_nome) as nome, cliente_fone as fone, SUM(quantidade * valor_unitario) as total_gasto, SUM(quantidade) as total_comprado FROM orders WHERE cliente_fone IS NOT NULL AND cliente_fone != '' GROUP BY cliente_fone ORDER BY total_gasto DESC LIMIT $1; `, [TOP_BUYERS_LIMIT]); return result.rows; }; const claimReadyCampaignItems = async () => { const client = await pool.connect(); try { await client.query('BEGIN'); const result = await client.query(` WITH ready_groups AS ( SELECT base_product_name FROM stock_campaign_queue WHERE status IN ('pending', 'failed') AND attempts < $1 GROUP BY base_product_name HAVING SUM(delta_estoque) >= $2 ), ready_items AS ( SELECT queue.id FROM stock_campaign_queue queue JOIN ready_groups ON ready_groups.base_product_name = queue.base_product_name WHERE queue.status IN ('pending', 'failed') AND queue.attempts < $1 ORDER BY queue.created_at ASC FOR UPDATE OF queue SKIP LOCKED ) UPDATE stock_campaign_queue SET status = 'processing', attempts = attempts + 1, updated_at = CURRENT_TIMESTAMP, last_error = NULL WHERE id IN (SELECT id FROM ready_items) RETURNING *; `, [MAX_CAMPAIGN_ATTEMPTS, CAMPAIGN_DELTA_THRESHOLD]); await client.query('COMMIT'); return result.rows; } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } }; const countPendingBelowThresholdGroups = async () => { const result = await pool.query(` SELECT COUNT(*)::int as count FROM ( SELECT base_product_name FROM stock_campaign_queue WHERE status IN ('pending', 'failed') AND attempts < $1 GROUP BY base_product_name HAVING SUM(delta_estoque) < $2 ) below_threshold_groups; `, [MAX_CAMPAIGN_ATTEMPTS, CAMPAIGN_DELTA_THRESHOLD]); return result.rows[0]?.count || 0; }; const getCampaignQueueRows = async () => { const result = await pool.query(` SELECT * FROM stock_campaign_queue ORDER BY created_at DESC, id DESC LIMIT 500; `); return result.rows; }; const getCampaignQueueSummary = async () => { const rows = await getCampaignQueueRows(); return { threshold: CAMPAIGN_DELTA_THRESHOLD, maxAttempts: MAX_CAMPAIGN_ATTEMPTS, groups: groupCampaignRows(rows), rows }; }; const getCampaignPreview = async () => { const result = await pool.query(` SELECT * FROM stock_campaign_queue WHERE status IN ('pending', 'failed') AND attempts < $1 ORDER BY created_at ASC, id ASC; `, [MAX_CAMPAIGN_ATTEMPTS]); const groups = groupCampaignRowsByBaseProduct(result.rows); const readyGroups = {}; const belowThresholdGroups = {}; Object.entries(groups).forEach(([baseProductName, items]) => { const totalDelta = items.reduce((sum, item) => sum + Number(item.delta_estoque || 0), 0); if (totalDelta >= CAMPAIGN_DELTA_THRESHOLD) { readyGroups[baseProductName] = items; } else { belowThresholdGroups[baseProductName] = items; } }); const readyProducts = mapCampaignProducts(readyGroups).map(({ itemIds, ...product }) => product); const belowThresholdProducts = mapCampaignProducts(belowThresholdGroups).map(({ itemIds, ...product }) => product); const customers = await getTopBuyersAllTime(); return { threshold: CAMPAIGN_DELTA_THRESHOLD, readyProducts, belowThresholdProducts, productsText: readyProducts.length ? formatProductList(readyProducts.map(product => product.baseProduct)) : '', customerCount: customers.length, customersPreview: customers.slice(0, 10) }; }; const retryCampaignItems = async ({ ids, baseProductName } = {}) => { const params = []; const filters = [`status IN ('failed', 'skipped')`]; if (Array.isArray(ids) && ids.length) { params.push(ids.map(Number)); filters.push(`id = ANY($${params.length}::int[])`); } if (baseProductName) { params.push(baseProductName); filters.push(`base_product_name = $${params.length}`); } const result = await pool.query(` UPDATE stock_campaign_queue SET status = 'pending', attempts = 0, last_error = NULL, sent_at = NULL, updated_at = CURRENT_TIMESTAMP WHERE ${filters.join(' AND ')} RETURNING *; `, params); return { retried: result.rowCount, rows: result.rows }; }; const updateCampaignItemsStatus = async (ids, status, errorMessage = null) => { if (!ids.length) return; await pool.query(` UPDATE stock_campaign_queue SET status = $1::varchar, last_error = $2, updated_at = CURRENT_TIMESTAMP, sent_at = CASE WHEN $1 = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END WHERE id = ANY($3::int[]); `, [status, errorMessage, ids]); }; const sendWhatsappCampaign = async (products, customers) => { const response = await fetch(N8N_WHATSAPP_TRIGGER_URL, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(buildWhatsappCampaignPayload(products, customers)) }); if (!response.ok) { throw new Error(`WhatsApp webhook returned status ${response.status}`); } }; const processPendingStockCampaigns = async () => { const rows = await claimReadyCampaignItems(); const summary = { claimed: rows.length, sentGroups: 0, skippedGroups: 0, failedGroups: 0, pendingBelowThresholdGroups: await countPendingBelowThresholdGroups() }; if (!rows.length) { return summary; } const groups = groupCampaignRowsByBaseProduct(rows); const products = mapCampaignProducts(groups); const ids = products.flatMap(product => product.itemIds); const customers = await getTopBuyersAllTime(); if (!customers.length) { await updateCampaignItemsStatus(ids, 'skipped', 'No customers with valid phone numbers found.'); summary.skippedGroups = products.length; return summary; } try { await sendWhatsappCampaign(products, customers); await updateCampaignItemsStatus(ids, 'sent'); summary.sentGroups = products.length; console.log(`[Campaign Queue] Sent one campaign with ${products.length} products to ${customers.length} all-time top buyers.`); } catch (error) { await updateCampaignItemsStatus(ids, 'failed', error.message); summary.failedGroups = products.length; console.error('[Campaign Queue] Failed to send product list campaign:', error); } return summary; }; module.exports = { enqueueStockCampaignItem, getCampaignPreview, getCampaignQueueSummary, retryCampaignItems, processPendingStockCampaigns };