add campaign observability page
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s

This commit is contained in:
Cauê Faleiros
2026-05-28 11:07:46 -03:00
parent 6c0a78675c
commit e2d0e94080
8 changed files with 572 additions and 2 deletions

View File

@@ -98,6 +98,128 @@ const countPendingBelowThresholdGroups = async () => {
return result.rows[0]?.count || 0;
};
const getCampaignQueueRows = async () => {
const result = await pool.query(`
SELECT *
FROM stock_campaign_queue
ORDER BY created_at DESC, id DESC
LIMIT 500;
`);
return result.rows;
};
const groupCampaignRows = (rows) => {
return Object.values(rows.reduce((acc, row) => {
const key = `${row.base_product_name}:${row.status}`;
if (!acc[key]) {
acc[key] = {
key,
baseProductName: row.base_product_name,
status: row.status,
totalDelta: 0,
rowCount: 0,
attempts: 0,
lastError: null,
createdAt: row.created_at,
updatedAt: row.updated_at,
sentAt: row.sent_at,
items: []
};
}
acc[key].totalDelta += Number(row.delta_estoque || 0);
acc[key].rowCount += 1;
acc[key].attempts = Math.max(acc[key].attempts, Number(row.attempts || 0));
acc[key].lastError = row.last_error || acc[key].lastError;
acc[key].createdAt = new Date(row.created_at) < new Date(acc[key].createdAt) ? row.created_at : acc[key].createdAt;
acc[key].updatedAt = new Date(row.updated_at) > new Date(acc[key].updatedAt) ? row.updated_at : acc[key].updatedAt;
acc[key].sentAt = row.sent_at || acc[key].sentAt;
acc[key].items.push(row);
return acc;
}, {})).sort((a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime());
};
const getCampaignQueueSummary = async () => {
const rows = await getCampaignQueueRows();
return {
threshold: CAMPAIGN_DELTA_THRESHOLD,
maxAttempts: MAX_CAMPAIGN_ATTEMPTS,
groups: groupCampaignRows(rows),
rows
};
};
const getCampaignPreview = async () => {
const result = await pool.query(`
SELECT *
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
ORDER BY created_at ASC, id ASC;
`, [MAX_CAMPAIGN_ATTEMPTS]);
const groups = result.rows.reduce((acc, row) => {
if (!acc[row.base_product_name]) acc[row.base_product_name] = [];
acc[row.base_product_name].push(row);
return acc;
}, {});
const readyGroups = {};
const belowThresholdGroups = {};
Object.entries(groups).forEach(([baseProductName, items]) => {
const totalDelta = items.reduce((sum, item) => sum + Number(item.delta_estoque || 0), 0);
if (totalDelta >= CAMPAIGN_DELTA_THRESHOLD) {
readyGroups[baseProductName] = items;
} else {
belowThresholdGroups[baseProductName] = items;
}
});
const readyProducts = mapCampaignProducts(readyGroups).map(({ itemIds, ...product }) => product);
const belowThresholdProducts = mapCampaignProducts(belowThresholdGroups).map(({ itemIds, ...product }) => product);
const customers = await getTopBuyersAllTime();
return {
threshold: CAMPAIGN_DELTA_THRESHOLD,
readyProducts,
belowThresholdProducts,
productsText: readyProducts.length ? formatProductList(readyProducts.map(product => product.baseProduct)) : '',
customerCount: customers.length,
customersPreview: customers.slice(0, 10)
};
};
const retryCampaignItems = async ({ ids, baseProductName } = {}) => {
const params = [];
const filters = [`status IN ('failed', 'skipped')`];
if (Array.isArray(ids) && ids.length) {
params.push(ids.map(Number));
filters.push(`id = ANY($${params.length}::int[])`);
}
if (baseProductName) {
params.push(baseProductName);
filters.push(`base_product_name = $${params.length}`);
}
const result = await pool.query(`
UPDATE stock_campaign_queue
SET status = 'pending',
attempts = 0,
last_error = NULL,
sent_at = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE ${filters.join(' AND ')}
RETURNING *;
`, params);
return {
retried: result.rowCount,
rows: result.rows
};
};
const updateCampaignItemsStatus = async (ids, status, errorMessage = null) => {
if (!ids.length) return;
@@ -210,5 +332,8 @@ const processPendingStockCampaigns = async () => {
module.exports = {
enqueueStockCampaignItem,
getCampaignPreview,
getCampaignQueueSummary,
retryCampaignItems,
processPendingStockCampaigns
};