refactor backend and persist stock campaign queue
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m32s

This commit is contained in:
Cauê Faleiros
2026-05-27 15:00:23 -03:00
parent 6ba8219596
commit 8c2590c56a
25 changed files with 658 additions and 363 deletions

View File

@@ -0,0 +1,158 @@
const { pool } = require('../db');
const { N8N_WHATSAPP_TRIGGER_URL } = require('../config');
const TOP_BUYERS_LIMIT = 100;
const MAX_CAMPAIGN_ATTEMPTS = 3;
const enqueueStockCampaignItem = async (client, item) => {
const query = `
INSERT INTO stock_campaign_queue (
base_product_name, produto_id, nome, saldo, delta_estoque
) VALUES ($1, $2, $3, $4, $5)
`;
await client.query(query, [
item.baseProductName,
item.produtoId,
item.nome,
item.saldo,
item.deltaEstoque
]);
};
const getTopBuyersAllTime = async () => {
const result = await pool.query(`
SELECT
MAX(cliente_nome) as nome,
cliente_fone as fone,
SUM(quantidade * valor_unitario) as total_gasto,
SUM(quantidade) as total_comprado
FROM orders
WHERE cliente_fone IS NOT NULL
AND cliente_fone != ''
GROUP BY cliente_fone
ORDER BY total_gasto DESC
LIMIT $1;
`, [TOP_BUYERS_LIMIT]);
return result.rows;
};
const claimPendingCampaignItems = async () => {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await client.query(`
UPDATE stock_campaign_queue
SET status = 'processing',
attempts = attempts + 1,
updated_at = CURRENT_TIMESTAMP,
last_error = NULL
WHERE id IN (
SELECT id
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
)
RETURNING *;
`, [MAX_CAMPAIGN_ATTEMPTS]);
await client.query('COMMIT');
return result.rows;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
};
const updateCampaignItemsStatus = async (ids, status, errorMessage = null) => {
if (!ids.length) return;
await pool.query(`
UPDATE stock_campaign_queue
SET status = $1,
last_error = $2,
updated_at = CURRENT_TIMESTAMP,
sent_at = CASE WHEN $1 = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END
WHERE id = ANY($3::int[]);
`, [status, errorMessage, ids]);
};
const sendWhatsappCampaign = async (baseProductName, items, customers) => {
const totalDelta = items.reduce((sum, item) => sum + Number(item.delta_estoque || 0), 0);
const response = await fetch(N8N_WHATSAPP_TRIGGER_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
baseProduct: baseProductName,
total_delta: totalDelta,
sizes: items.map(item => ({
id: item.produto_id,
nome: item.nome,
delta: item.delta_estoque,
saldo: item.saldo
})),
customers
})
});
if (!response.ok) {
throw new Error(`WhatsApp webhook returned status ${response.status}`);
}
};
const processPendingStockCampaigns = async () => {
const rows = await claimPendingCampaignItems();
const summary = {
claimed: rows.length,
sentGroups: 0,
skippedGroups: 0,
failedGroups: 0
};
if (!rows.length) {
return summary;
}
const customers = await getTopBuyersAllTime();
const groups = rows.reduce((acc, row) => {
if (!acc[row.base_product_name]) acc[row.base_product_name] = [];
acc[row.base_product_name].push(row);
return acc;
}, {});
for (const [baseProductName, items] of Object.entries(groups)) {
const ids = items.map(item => item.id);
if (!customers.length) {
await updateCampaignItemsStatus(ids, 'skipped', 'No customers with valid phone numbers found.');
summary.skippedGroups += 1;
continue;
}
try {
await sendWhatsappCampaign(baseProductName, items, customers);
await updateCampaignItemsStatus(ids, 'sent');
summary.sentGroups += 1;
console.log(`[Campaign Queue] Sent ${baseProductName} campaign to ${customers.length} all-time top buyers.`);
} catch (error) {
await updateCampaignItemsStatus(ids, 'failed', error.message);
summary.failedGroups += 1;
console.error(`[Campaign Queue] Failed to send ${baseProductName} campaign:`, error);
}
}
return summary;
};
module.exports = {
enqueueStockCampaignItem,
processPendingStockCampaigns
};

View File

@@ -0,0 +1,47 @@
const { pool } = require('../db');
const { formatOrderRow, normalizeOrderPayload } = require('../mappers/orderMapper');
const listOrders = async () => {
const result = await pool.query('SELECT * FROM orders ORDER BY id DESC');
return result.rows.map(formatOrderRow);
};
const upsertOrders = async (payload) => {
const client = await pool.connect();
try {
await client.query('BEGIN');
const insertQuery = `
INSERT INTO orders (
cliente_nome, data_pedido, valor_pedido,
produto_id, produto_descricao, quantidade, valor_unitario, pedido_id, cliente_fone
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (pedido_id, produto_id) DO UPDATE SET
cliente_nome = EXCLUDED.cliente_nome,
data_pedido = EXCLUDED.data_pedido,
valor_pedido = EXCLUDED.valor_pedido,
produto_descricao = EXCLUDED.produto_descricao,
quantidade = EXCLUDED.quantidade,
valor_unitario = EXCLUDED.valor_unitario,
cliente_fone = EXCLUDED.cliente_fone,
created_at = CURRENT_TIMESTAMP
`;
for (const item of payload) {
await client.query(insertQuery, normalizeOrderPayload(item));
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
};
module.exports = {
listOrders,
upsertOrders
};

View File

@@ -0,0 +1,56 @@
const { pool } = require('../db');
const { normalizeStockPayload } = require('../mappers/stockMapper');
const { enqueueStockCampaignItem } = require('./campaignService');
const CAMPAIGN_DELTA_THRESHOLD = 100;
const listStock = async () => {
const result = await pool.query('SELECT * FROM stock');
return result.rows;
};
const upsertStockItems = async (payload) => {
const client = await pool.connect();
try {
await client.query('BEGIN');
const insertQuery = `
INSERT INTO stock (produto_id, nome, saldo, delta_estoque)
VALUES ($1, $2, $3, $4)
ON CONFLICT (produto_id) DO UPDATE SET
nome = EXCLUDED.nome,
saldo = EXCLUDED.saldo,
delta_estoque = EXCLUDED.delta_estoque,
updated_at = CURRENT_TIMESTAMP
`;
for (const rawItem of payload) {
const item = normalizeStockPayload(rawItem);
if (!item.produtoId) continue;
await client.query(insertQuery, [
item.produtoId,
item.nome,
item.saldo,
item.deltaEstoque
]);
if (item.deltaEstoque >= CAMPAIGN_DELTA_THRESHOLD) {
await enqueueStockCampaignItem(client, item);
}
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
};
module.exports = {
listStock,
upsertStockItems
};