Files
graphs/backend/services/campaignService.js
2026-05-28 11:18:45 -03:00

266 lines
8.1 KiB
JavaScript

const { pool } = require('../db');
const { N8N_WHATSAPP_TRIGGER_URL } = require('../config');
const {
buildWhatsappCampaignPayload,
formatProductList,
groupCampaignRows,
groupCampaignRowsByBaseProduct,
mapCampaignProducts
} = require('./campaignFormatter');
const TOP_BUYERS_LIMIT = 100;
const MAX_CAMPAIGN_ATTEMPTS = 3;
const CAMPAIGN_DELTA_THRESHOLD = 100;
const enqueueStockCampaignItem = async (client, item) => {
const query = `
INSERT INTO stock_campaign_queue (
base_product_name, produto_id, nome, saldo, delta_estoque
) VALUES ($1, $2, $3, $4, $5)
`;
await client.query(query, [
item.baseProductName,
item.produtoId,
item.nome,
item.saldo,
item.deltaEstoque
]);
};
const getTopBuyersAllTime = async () => {
const result = await pool.query(`
SELECT
MAX(cliente_nome) as nome,
cliente_fone as fone,
SUM(quantidade * valor_unitario) as total_gasto,
SUM(quantidade) as total_comprado
FROM orders
WHERE cliente_fone IS NOT NULL
AND cliente_fone != ''
GROUP BY cliente_fone
ORDER BY total_gasto DESC
LIMIT $1;
`, [TOP_BUYERS_LIMIT]);
return result.rows;
};
const claimReadyCampaignItems = async () => {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await client.query(`
WITH ready_groups AS (
SELECT base_product_name
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
GROUP BY base_product_name
HAVING SUM(delta_estoque) >= $2
),
ready_items AS (
SELECT queue.id
FROM stock_campaign_queue queue
JOIN ready_groups ON ready_groups.base_product_name = queue.base_product_name
WHERE queue.status IN ('pending', 'failed')
AND queue.attempts < $1
ORDER BY queue.created_at ASC
FOR UPDATE OF queue SKIP LOCKED
)
UPDATE stock_campaign_queue
SET status = 'processing',
attempts = attempts + 1,
updated_at = CURRENT_TIMESTAMP,
last_error = NULL
WHERE id IN (SELECT id FROM ready_items)
RETURNING *;
`, [MAX_CAMPAIGN_ATTEMPTS, CAMPAIGN_DELTA_THRESHOLD]);
await client.query('COMMIT');
return result.rows;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
};
const countPendingBelowThresholdGroups = async () => {
const result = await pool.query(`
SELECT COUNT(*)::int as count
FROM (
SELECT base_product_name
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
GROUP BY base_product_name
HAVING SUM(delta_estoque) < $2
) below_threshold_groups;
`, [MAX_CAMPAIGN_ATTEMPTS, CAMPAIGN_DELTA_THRESHOLD]);
return result.rows[0]?.count || 0;
};
const getCampaignQueueRows = async () => {
const result = await pool.query(`
SELECT *
FROM stock_campaign_queue
ORDER BY created_at DESC, id DESC
LIMIT 500;
`);
return result.rows;
};
const getCampaignQueueSummary = async () => {
const rows = await getCampaignQueueRows();
return {
threshold: CAMPAIGN_DELTA_THRESHOLD,
maxAttempts: MAX_CAMPAIGN_ATTEMPTS,
groups: groupCampaignRows(rows),
rows
};
};
const getCampaignPreview = async () => {
const result = await pool.query(`
SELECT *
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
ORDER BY created_at ASC, id ASC;
`, [MAX_CAMPAIGN_ATTEMPTS]);
const groups = groupCampaignRowsByBaseProduct(result.rows);
const readyGroups = {};
const belowThresholdGroups = {};
Object.entries(groups).forEach(([baseProductName, items]) => {
const totalDelta = items.reduce((sum, item) => sum + Number(item.delta_estoque || 0), 0);
if (totalDelta >= CAMPAIGN_DELTA_THRESHOLD) {
readyGroups[baseProductName] = items;
} else {
belowThresholdGroups[baseProductName] = items;
}
});
const readyProducts = mapCampaignProducts(readyGroups).map(({ itemIds, ...product }) => product);
const belowThresholdProducts = mapCampaignProducts(belowThresholdGroups).map(({ itemIds, ...product }) => product);
const customers = await getTopBuyersAllTime();
return {
threshold: CAMPAIGN_DELTA_THRESHOLD,
readyProducts,
belowThresholdProducts,
productsText: readyProducts.length ? formatProductList(readyProducts.map(product => product.baseProduct)) : '',
customerCount: customers.length,
customersPreview: customers.slice(0, 10)
};
};
const retryCampaignItems = async ({ ids, baseProductName } = {}) => {
const params = [];
const filters = [`status IN ('failed', 'skipped')`];
if (Array.isArray(ids) && ids.length) {
params.push(ids.map(Number));
filters.push(`id = ANY($${params.length}::int[])`);
}
if (baseProductName) {
params.push(baseProductName);
filters.push(`base_product_name = $${params.length}`);
}
const result = await pool.query(`
UPDATE stock_campaign_queue
SET status = 'pending',
attempts = 0,
last_error = NULL,
sent_at = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE ${filters.join(' AND ')}
RETURNING *;
`, params);
return {
retried: result.rowCount,
rows: result.rows
};
};
const updateCampaignItemsStatus = async (ids, status, errorMessage = null) => {
if (!ids.length) return;
await pool.query(`
UPDATE stock_campaign_queue
SET status = $1::varchar,
last_error = $2,
updated_at = CURRENT_TIMESTAMP,
sent_at = CASE WHEN $1 = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END
WHERE id = ANY($3::int[]);
`, [status, errorMessage, ids]);
};
const sendWhatsappCampaign = async (products, customers) => {
const response = await fetch(N8N_WHATSAPP_TRIGGER_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(buildWhatsappCampaignPayload(products, customers))
});
if (!response.ok) {
throw new Error(`WhatsApp webhook returned status ${response.status}`);
}
};
const processPendingStockCampaigns = async () => {
const rows = await claimReadyCampaignItems();
const summary = {
claimed: rows.length,
sentGroups: 0,
skippedGroups: 0,
failedGroups: 0,
pendingBelowThresholdGroups: await countPendingBelowThresholdGroups()
};
if (!rows.length) {
return summary;
}
const groups = groupCampaignRowsByBaseProduct(rows);
const products = mapCampaignProducts(groups);
const ids = products.flatMap(product => product.itemIds);
const customers = await getTopBuyersAllTime();
if (!customers.length) {
await updateCampaignItemsStatus(ids, 'skipped', 'No customers with valid phone numbers found.');
summary.skippedGroups = products.length;
return summary;
}
try {
await sendWhatsappCampaign(products, customers);
await updateCampaignItemsStatus(ids, 'sent');
summary.sentGroups = products.length;
console.log(`[Campaign Queue] Sent one campaign with ${products.length} products to ${customers.length} all-time top buyers.`);
} catch (error) {
await updateCampaignItemsStatus(ids, 'failed', error.message);
summary.failedGroups = products.length;
console.error('[Campaign Queue] Failed to send product list campaign:', error);
}
return summary;
};
module.exports = {
enqueueStockCampaignItem,
getCampaignPreview,
getCampaignQueueSummary,
retryCampaignItems,
processPendingStockCampaigns
};