From c1f47828f64b9cb3a1014bccd3e63f10bc36fa4b Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Tue, 22 Oct 2024 16:54:25 +0200 Subject: [PATCH] chore(webhook-service): refactor multiregion --- packages/webhook-service/src/main.js | 193 ++++++++++++++------------- 1 file changed, 102 insertions(+), 91 deletions(-) diff --git a/packages/webhook-service/src/main.js b/packages/webhook-service/src/main.js index 32a6a7890..ca2e1b9b2 100644 --- a/packages/webhook-service/src/main.js +++ b/packages/webhook-service/src/main.js @@ -12,10 +12,12 @@ const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query' const { makeNetworkRequest } = require('./webhookCaller') const WebhookError = require('./errors') -async function startTask() { - const { rows } = await knex.raw(` +const startTaskFactory = + ({ db }) => + async () => { + const { rows } = await db.raw(` UPDATE webhooks_events - SET + SET "status" = 1, "lastUpdate" = NOW() FROM ( @@ -27,15 +29,17 @@ async function startTask() { WHERE webhooks_events."id" = task."id" RETURNING webhooks_events."id" `) - return rows[0] -} + return rows[0] + } -async function doTask(task) { - let boundLogger = logger.child({ taskId: task.id }) - try { - const { rows } = await knex.raw( - ` - SELECT +const doTaskFactory = + ({ db }) => + async (task) => { + let boundLogger = logger.child({ taskId: task.id }) + try { + const { rows } = await db.raw( + ` + SELECT ev.payload as evt, cnf.id as wh_id, cnf.url as wh_url, cnf.secret as wh_secret, cnf.enabled as wh_enabled FROM webhooks_events ev @@ -43,49 +47,49 @@ async function doTask(task) { WHERE ev.id = ? LIMIT 1 `, - [task.id] - ) - const info = rows[0] - if (!info) { - throw new Error('Internal error: DB inconsistent') - } - boundLogger = boundLogger.child({ webhookId: info.wh_id }) - - const fullPayload = JSON.parse(info.evt) - boundLogger = boundLogger.child({ - streamId: fullPayload.streamId, - eventName: fullPayload.event.event_name - }) - - const postData = { payload: fullPayload } - - const signature = crypto - .createHmac('sha256', info.wh_secret || '') - .update(JSON.stringify(postData)) - .digest('hex') - const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature } - - boundLogger.info('Calling webhook.') - const result = await makeNetworkRequest({ - url: info.wh_url, - data: postData, - headersData: postHeaders, - logger: boundLogger - }) - - boundLogger.info({ result }, `Received response from webhook.`) - - if (!result.success) { - throw new WebhookError( - result.error, - 'Calling webhook was unsuccessful.', - result.responseCode, - result.responseBody + [task.id] ) - } + const info = rows[0] + if (!info) { + throw new Error('Internal error: DB inconsistent') + } + boundLogger = boundLogger.child({ webhookId: info.wh_id }) - await knex.raw( - ` + const fullPayload = JSON.parse(info.evt) + boundLogger = boundLogger.child({ + streamId: fullPayload.streamId, + eventName: fullPayload.event.event_name + }) + + const postData = { payload: fullPayload } + + const signature = crypto + .createHmac('sha256', info.wh_secret || '') + .update(JSON.stringify(postData)) + .digest('hex') + const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature } + + boundLogger.info('Calling webhook.') + const result = await makeNetworkRequest({ + url: info.wh_url, + data: postData, + headersData: postHeaders, + logger: boundLogger + }) + + boundLogger.info({ result }, `Received response from webhook.`) + + if (!result.success) { + throw new WebhookError( + result.error, + 'Calling webhook was unsuccessful.', + result.responseCode, + result.responseBody + ) + } + + await db.raw( + ` UPDATE webhooks_events SET "status" = 2, @@ -93,18 +97,18 @@ async function doTask(task) { "statusInfo" = 'Webhook called' WHERE "id" = ? `, - [task.id] - ) - } catch (err) { - switch (err.constructor) { - case WebhookError: - boundLogger.warn({ err }, 'Failed to trigger webhook event.') - break - default: - boundLogger.error(err, 'Failed to trigger webhook event.') - } - await knex.raw( - ` + [task.id] + ) + } catch (err) { + switch (err.constructor) { + case WebhookError: + boundLogger.warn({ err }, 'Failed to trigger webhook event.') + break + default: + boundLogger.error(err, 'Failed to trigger webhook event.') + } + await db.raw( + ` UPDATE webhooks_events SET "status" = 3, @@ -112,41 +116,43 @@ async function doTask(task) { "statusInfo" = ? WHERE "id" = ? `, - [err.toString(), task.id] - ) - metrics.metricOperationErrors.labels('webhook').inc() - } -} - -async function tick() { - if (shouldExit) { - process.exit(0) + [err.toString(), task.id] + ) + metrics.metricOperationErrors.labels('webhook').inc() + } } - try { - const task = await startTask() - - fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {}) - - if (!task) { - setTimeout(tick, 1000) - return +const tickFactory = + ({ doTask, startTask, tick }) => + async () => { + if (shouldExit) { + process.exit(0) } - const metricDurationEnd = metrics.metricDuration.startTimer() + try { + const task = await startTask() - await doTask(task) + fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {}) - metricDurationEnd({ op: 'webhook' }) + if (!task) { + setTimeout(tick, 1000) + return + } - // Check for another task very soon - setTimeout(tick, 10) - } catch (err) { - metrics.metricOperationErrors.labels('main_loop').inc() - logger.error(err, 'Error executing task') - setTimeout(tick, 5000) + const metricDurationEnd = metrics.metricDuration.startTimer() + + await doTask(task) + + metricDurationEnd({ op: 'webhook' }) + + // Check for another task very soon + setTimeout(tick, 10) + } catch (err) { + metrics.metricOperationErrors.labels('main_loop').inc() + logger.error(err, 'Error executing task') + setTimeout(tick, 5000) + } } -} async function main() { logger.info('Starting Webhook Service...') @@ -157,6 +163,11 @@ async function main() { }) metrics.initPrometheusMetrics() + const tick = tickFactory({ + doTask: doTaskFactory({ db: knex }), + startTask: startTaskFactory({ db: knex }), + tick: (...args) => tick(...args) + }) tick() }