accumulate stock deltas before campaigns
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 52s

This commit is contained in:
Cauê Faleiros
2026-05-27 16:14:09 -03:00
parent 72ded82ec7
commit 5e0bb1d83a
2 changed files with 43 additions and 15 deletions

View File

@@ -3,6 +3,7 @@ const { N8N_WHATSAPP_TRIGGER_URL } = require('../config');
const TOP_BUYERS_LIMIT = 100; const TOP_BUYERS_LIMIT = 100;
const MAX_CAMPAIGN_ATTEMPTS = 3; const MAX_CAMPAIGN_ATTEMPTS = 3;
const CAMPAIGN_DELTA_THRESHOLD = 100;
const enqueueStockCampaignItem = async (client, item) => { const enqueueStockCampaignItem = async (client, item) => {
const query = ` const query = `
@@ -38,28 +39,38 @@ const getTopBuyersAllTime = async () => {
return result.rows; return result.rows;
}; };
const claimPendingCampaignItems = async () => { const claimReadyCampaignItems = async () => {
const client = await pool.connect(); const client = await pool.connect();
try { try {
await client.query('BEGIN'); await client.query('BEGIN');
const result = await client.query(` const result = await client.query(`
WITH ready_groups AS (
SELECT base_product_name
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
GROUP BY base_product_name
HAVING SUM(delta_estoque) >= $2
),
ready_items AS (
SELECT queue.id
FROM stock_campaign_queue queue
JOIN ready_groups ON ready_groups.base_product_name = queue.base_product_name
WHERE queue.status IN ('pending', 'failed')
AND queue.attempts < $1
ORDER BY queue.created_at ASC
FOR UPDATE OF queue SKIP LOCKED
)
UPDATE stock_campaign_queue UPDATE stock_campaign_queue
SET status = 'processing', SET status = 'processing',
attempts = attempts + 1, attempts = attempts + 1,
updated_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP,
last_error = NULL last_error = NULL
WHERE id IN ( WHERE id IN (SELECT id FROM ready_items)
SELECT id
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
)
RETURNING *; RETURNING *;
`, [MAX_CAMPAIGN_ATTEMPTS]); `, [MAX_CAMPAIGN_ATTEMPTS, CAMPAIGN_DELTA_THRESHOLD]);
await client.query('COMMIT'); await client.query('COMMIT');
return result.rows; return result.rows;
@@ -71,12 +82,28 @@ const claimPendingCampaignItems = async () => {
} }
}; };
const countPendingBelowThresholdGroups = async () => {
const result = await pool.query(`
SELECT COUNT(*)::int as count
FROM (
SELECT base_product_name
FROM stock_campaign_queue
WHERE status IN ('pending', 'failed')
AND attempts < $1
GROUP BY base_product_name
HAVING SUM(delta_estoque) < $2
) below_threshold_groups;
`, [MAX_CAMPAIGN_ATTEMPTS, CAMPAIGN_DELTA_THRESHOLD]);
return result.rows[0]?.count || 0;
};
const updateCampaignItemsStatus = async (ids, status, errorMessage = null) => { const updateCampaignItemsStatus = async (ids, status, errorMessage = null) => {
if (!ids.length) return; if (!ids.length) return;
await pool.query(` await pool.query(`
UPDATE stock_campaign_queue UPDATE stock_campaign_queue
SET status = $1, SET status = $1::varchar,
last_error = $2, last_error = $2,
updated_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP,
sent_at = CASE WHEN $1 = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END sent_at = CASE WHEN $1 = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END
@@ -109,12 +136,13 @@ const sendWhatsappCampaign = async (baseProductName, items, customers) => {
}; };
const processPendingStockCampaigns = async () => { const processPendingStockCampaigns = async () => {
const rows = await claimPendingCampaignItems(); const rows = await claimReadyCampaignItems();
const summary = { const summary = {
claimed: rows.length, claimed: rows.length,
sentGroups: 0, sentGroups: 0,
skippedGroups: 0, skippedGroups: 0,
failedGroups: 0 failedGroups: 0,
pendingBelowThresholdGroups: await countPendingBelowThresholdGroups()
}; };
if (!rows.length) { if (!rows.length) {

View File

@@ -2,7 +2,7 @@ const { pool } = require('../db');
const { normalizeStockPayload } = require('../mappers/stockMapper'); const { normalizeStockPayload } = require('../mappers/stockMapper');
const { enqueueStockCampaignItem } = require('./campaignService'); const { enqueueStockCampaignItem } = require('./campaignService');
const CAMPAIGN_DELTA_THRESHOLD = 100; const POSITIVE_STOCK_DELTA_THRESHOLD = 1;
const listStock = async () => { const listStock = async () => {
const result = await pool.query('SELECT * FROM stock'); const result = await pool.query('SELECT * FROM stock');
@@ -36,7 +36,7 @@ const upsertStockItems = async (payload) => {
item.deltaEstoque item.deltaEstoque
]); ]);
if (item.deltaEstoque >= CAMPAIGN_DELTA_THRESHOLD) { if (item.deltaEstoque >= POSITIVE_STOCK_DELTA_THRESHOLD) {
await enqueueStockCampaignItem(client, item); await enqueueStockCampaignItem(client, item);
} }
} }