Use job queue to update running experiment results every 6 hours. (#24)

* Use job queue to update running experiment results every 6 hours.

* Refactor the update job

* Connect to Mongo and start the queue automatically on back-end startup.
Limit queue prefetch to 5 per job type for better horizontal scalability.
This commit is contained in:
Jeremy Dorn
2021-07-04 20:12:59 -05:00
committed by GitHub
parent 6c1e658fe9
commit 7d78b4d807
8 changed files with 347 additions and 301 deletions

View File

@@ -9,8 +9,7 @@
"test": "wsrun -m test",
"dev": "wsrun -p '*-end' -m dev",
"build": "wsrun -p '*-end' -m build",
"start": "wsrun -p '*-end' -m start",
"cron": "yarn workspace back-end cron"
"start": "wsrun -p '*-end' -m start"
},
"workspaces": [
"packages/*"

View File

@@ -10,11 +10,10 @@
"dev": "node-dev src/server.ts",
"build": "yarn build:clean && yarn build:typescript && yarn build:python && yarn build:emails",
"start": "node dist/server.js",
"test:node": "jest --forceExit --coverage --verbose --detectOpenHandles",
"test:node": "NO_INIT=1 jest --forceExit --coverage --verbose --detectOpenHandles",
"test:python": "cd src/python && python3 -m unittest discover",
"test": "yarn test:python && yarn test:node",
"type-check": "tsc --pretty --noEmit",
"cron": "node dist/cron.js",
"generate-dummy-data": "node --stack-size=8192 ./test/data-generator/data-generator.js",
"import-dummy-data": "node --stack-size=8192 ./test/data-generator/import.js"
},

View File

@@ -61,28 +61,34 @@ wrapController(slackController);
const app = express();
export async function init() {
await mongoInit();
await queueInit();
let initPromise: Promise<void>;
async function init() {
if (!initPromise) {
initPromise = (async () => {
await mongoInit();
await queueInit();
})();
}
await initPromise;
}
let initPromise: Promise<void>;
const initMiddleware = async (
req: Request,
res: Response,
next: NextFunction
) => {
if (!initPromise) {
initPromise = init();
}
try {
await initPromise;
await init();
next();
} catch (e) {
next(e);
}
};
if (!process.env.NO_INIT) {
init();
}
app.set("port", process.env.PORT || 3100);
app.use(cookieParser());

View File

@@ -55,7 +55,7 @@ import { DimensionInterface } from "../../types/dimension";
import { addGroupsDiff } from "../services/group";
import { IdeaModel } from "../models/IdeasModel";
import { IdeaInterface } from "../../types/idea";
import { queueWebhook } from "../init/queue";
import { queueWebhook } from "../jobs/webhooks";
export async function getExperiments(req: AuthRequest, res: Response) {
const experiments = await getExperimentsByOrganization(req.organization.id);

View File

@@ -1,194 +0,0 @@
import { init } from "./app";
import { ExperimentModel } from "./models/ExperimentModel";
import {
createSnapshot,
getExperimentWatchers,
getLatestSnapshot,
getMetricById,
} from "./services/experiments";
import { getDataSourceById } from "./services/datasource";
import pino from "pino";
import { isEmailEnabled, sendExperimentChangesEmail } from "./services/email";
import { getConfidenceLevelsForOrg } from "./services/organizations";
const MAX_UPDATES = 10;
const UPDATE_FREQUENCY = 360;
const parentLogger = pino();
const logger = parentLogger.child({
cron: true,
});
logger.info("Cron started");
// Time out after 30 minutes
const timer = setTimeout(() => {
logger.warn("Cron Timeout");
process.exit(1);
}, 30 * 60 * 1000);
(async () => {
await init();
const latestDate = new Date();
latestDate.setMinutes(latestDate.getMinutes() - UPDATE_FREQUENCY);
const experiments = await ExperimentModel.find({
datasource: {
$exists: true,
$ne: "",
},
status: "running",
autoSnapshots: true,
lastSnapshotAttempt: {
$lte: latestDate,
},
})
.limit(MAX_UPDATES)
.sort({
lastSnapshotAttempt: 1,
});
const promises = experiments.map(async (experiment) => {
try {
logger.info({ experiment: experiment.id }, "Updating experiment - Start");
const datasource = await getDataSourceById(experiment.datasource);
const lastSnapshot = await getLatestSnapshot(
experiment.id,
experiment.phases.length - 1
);
const currentSnapshot = await createSnapshot(
experiment,
experiment.phases.length - 1,
datasource
);
logger.info(
{ experiment: experiment.id },
"Updating experiment - Success"
);
// get the org confidence level settings:
const { ciUpper, ciLower } = await getConfidenceLevelsForOrg(
experiment.organization
);
// check this and the previous snapshot to see if anything changed:
// asumptions:
// - that result[0] in the current snapshot is what we care about
// - that result[0] in the last snapshot is the same (could add a check for this)
const experimentChanges: string[] = [];
for (let i = 1; i < currentSnapshot.results[0].variations.length; i++) {
const curVar = currentSnapshot.results[0].variations[i];
const lastVar = lastSnapshot.results[0].variations[i];
for (const m in curVar.metrics) {
// sanity checks:
if (
lastVar.metrics[m] &&
lastVar.metrics[m].chanceToWin &&
curVar.metrics[m].value > 150
) {
// checks to see if anything changed:
if (
curVar.metrics[m].chanceToWin > ciUpper &&
lastVar.metrics[m].chanceToWin < ciUpper
) {
// this test variation has gone significant, and won
experimentChanges.push(
"The metric " +
getMetricById(m) +
" for variation " +
experiment.variations[i].name +
" has reached a " +
(curVar.metrics[m].chanceToWin * 100).toFixed(1) +
"% chance to beat baseline"
);
} else if (
/* else if(curVar.metrics[m].chanceToWin < 0.85 && lastVar.metrics[m].chanceToWin > 0.95) {
// this test variation was significant, but is now not.
experimentChanges.push(
"The metric "+getMetricById(m)+" is no longer a significant improvement for variation "+experiment.variations[i].name+" ("+lastVar.metrics[m].chanceToWin.toFixed(3)+" to "+ curVar.metrics[m].chanceToWin.toFixed(3)+")"
);
} */
curVar.metrics[m].chanceToWin < ciLower &&
lastVar.metrics[m].chanceToWin > ciLower
) {
// this test variation has gone significant, and lost
experimentChanges.push(
"The metric " +
getMetricById(m) +
" for variation " +
experiment.variations[i].name +
" has dropped to a " +
(curVar.metrics[m].chanceToWin * 100).toFixed(1) +
" chance to beat the baseline"
);
}
/*
else if(curVar.metrics[m].chanceToWin > 0.15 && lastVar.metrics[m].chanceToWin < 0.05) {
// this test was significant, and lost, but now hasn't.
experimentChanges.push(
"The metric "+getMetricById(m)+" is no longer significant for variation "+experiment.variations[i].name+" ("+lastVar.metrics[m].chanceToWin.toFixed(3)+" to "+ curVar.metrics[m].chanceToWin.toFixed(3)+")"
);
}
*/
}
}
}
if (experimentChanges.length) {
// send an email to any subscribers on this test:
logger.info(
{ experiment: experiment.id },
"Significant change - detected " +
experimentChanges.length +
" significant changes"
);
if (!isEmailEnabled()) {
logger.error(
{ experiment: experiment.id },
"Significant change - not sending as email not enabled"
);
} else {
const watchers = await getExperimentWatchers(experiment.id);
const userIds = watchers.map((w) => w.userId);
try {
await sendExperimentChangesEmail(
userIds,
experiment.id,
experiment.name,
experimentChanges
);
} catch (e) {
logger.error(
{ experiment: experiment.id },
"Significant change - Email sending failure:"
);
logger.error({ experiment: experiment.id }, e.message);
}
}
}
} catch (e) {
logger.error(
{ experiment: experiment.id },
"Updating experiment - Failure"
);
try {
experiment.autoSnapshots = false;
experiment.markModified("autoSnapshots");
await experiment.save();
// TODO: email user and let them know it failed
} catch (e) {
logger.error({ experiment: experiment.id }, e.message);
}
}
});
await Promise.all(promises);
logger.info("Cron finished");
clearTimeout(timer);
process.exit(0);
})();

View File

@@ -1,105 +1,17 @@
import Agenda, { Job } from "agenda";
import Agenda from "agenda";
import mongoose from "mongoose";
import { WebhookModel } from "../models/WebhookModel";
import { createHmac } from "crypto";
import fetch from "node-fetch";
import { getExperimentOverrides } from "../services/organizations";
const WEBHOOK_JOB_NAME = "fireWebhook";
type WebhookJob = Job<{
webhookId: string;
retryCount: number;
}>;
import addExperimentResultsJob from "../jobs/updateExperimentResults";
import addWebhooksJob from "../jobs/webhooks";
let agenda: Agenda;
export async function queueInit() {
agenda = new Agenda({
mongo: mongoose.connection.db,
defaultLockLimit: 5,
});
agenda.define(WEBHOOK_JOB_NAME, async (job: WebhookJob) => {
const { webhookId } = job.attrs.data;
const webhook = await WebhookModel.findOne({
id: webhookId,
});
if (!webhook) return;
const overrides = await getExperimentOverrides(webhook.organization);
const payload = JSON.stringify({
timestamp: Math.floor(Date.now() / 1000),
overrides,
});
const signature = createHmac("sha256", webhook.signingKey)
.update(payload)
.digest("hex");
const res = await fetch(webhook.endpoint, {
headers: {
"Content-Type": "application/json",
"X-GrowthBook-Signature": signature,
},
method: "POST",
body: payload,
});
if (!res.ok) {
const e = "POST returned an invalid status code: " + res.status;
webhook.set("error", e);
await webhook.save();
throw new Error(e);
}
webhook.set("error", "");
webhook.set("lastSuccess", new Date());
await webhook.save();
});
agenda.on(
"fail:" + WEBHOOK_JOB_NAME,
async (error: Error, job: WebhookJob) => {
const retryCount = job.attrs.data.retryCount;
let nextRunAt = Date.now();
// Wait 30s after the first failure
if (retryCount === 0) {
nextRunAt += 30000;
}
// Wait 5m after the second failure
else if (retryCount === 1) {
nextRunAt += 300000;
}
// If it failed 3 times, give up
else {
// TODO: email the organization owner
return;
}
job.attrs.data.retryCount++;
job.attrs.nextRunAt = new Date(nextRunAt);
await job.save();
}
);
addExperimentResultsJob(agenda);
addWebhooksJob(agenda);
await agenda.start();
}
export async function queueWebhook(orgId: string) {
// Only queue if the organization has at least 1 webhook defined
const webhooks = await WebhookModel.find({
organization: orgId,
});
if (!webhooks) return;
for (let i = 0; i < webhooks.length; i++) {
const webhookId = webhooks[i].id as string;
const job = agenda.create(WEBHOOK_JOB_NAME, {
webhookId,
retryCount: 0,
}) as WebhookJob;
job.unique({ webhookId });
job.schedule(new Date());
await job.save();
}
}

View File

@@ -0,0 +1,223 @@
import Agenda, { Job } from "agenda";
import { ExperimentModel } from "../models/ExperimentModel";
import { getDataSourceById } from "../services/datasource";
import { isEmailEnabled, sendExperimentChangesEmail } from "../services/email";
import {
createSnapshot,
getExperimentWatchers,
getLatestSnapshot,
getMetricById,
} from "../services/experiments";
import { getConfidenceLevelsForOrg } from "../services/organizations";
import pino from "pino";
import { ExperimentSnapshotDocument } from "../models/ExperimentSnapshotModel";
import { ExperimentInterface } from "../../types/experiment";
// Time between experiment result updates (6 hours)
const UPDATE_EVERY = 6 * 60 * 60 * 1000;
const QUEUE_EXPERIMENT_UPDATES = "queueExperimentUpdates";
const UPDATE_SINGLE_EXP = "updateSingleExperiment";
type UpdateSingleExpJob = Job<{
experimentId: string;
}>;
const parentLogger = pino();
export default async function (agenda: Agenda) {
agenda.define(QUEUE_EXPERIMENT_UPDATES, async () => {
// All experiments that haven't been updated in at least UPDATE_EVERY ms
const latestDate = new Date(Date.now() - UPDATE_EVERY);
const experimentIds = (
await ExperimentModel.find(
{
datasource: {
$exists: true,
$ne: "",
},
status: "running",
autoSnapshots: true,
lastSnapshotAttempt: {
$lte: latestDate,
},
},
{
id: true,
}
)
.limit(100)
.sort({
lastSnapshotAttempt: 1,
})
).map((e) => e.id);
for (let i = 0; i < experimentIds.length; i++) {
await queueExerimentUpdate(experimentIds[i]);
}
});
agenda.define(
UPDATE_SINGLE_EXP,
// This job queries a datasource, which may be slow. Give it 30 minutes to complete.
{ lockLifetime: 30 * 60 * 1000 },
updateSingleExperiment
);
// Update experiment results
await startUpdateJob();
async function startUpdateJob() {
const updateResultsJob = agenda.create(QUEUE_EXPERIMENT_UPDATES, {});
updateResultsJob.unique({});
updateResultsJob.repeatEvery("10 minutes");
await updateResultsJob.save();
}
async function queueExerimentUpdate(experimentId: string) {
const job = agenda.create(UPDATE_SINGLE_EXP, {
experimentId,
}) as UpdateSingleExpJob;
job.unique({
experimentId,
});
job.schedule(new Date());
await job.save();
}
}
async function updateSingleExperiment(job: UpdateSingleExpJob) {
const { experimentId } = job.attrs.data;
const logger = parentLogger.child({
cron: "updateSingleExperiment",
experimentId,
});
const experiment = await ExperimentModel.findOne({
id: experimentId,
});
let lastSnapshot: ExperimentSnapshotDocument;
let currentSnapshot: ExperimentSnapshotDocument;
try {
logger.info("Start Refreshing Results");
const datasource = await getDataSourceById(experiment.datasource);
lastSnapshot = await getLatestSnapshot(
experiment.id,
experiment.phases.length - 1
);
currentSnapshot = await createSnapshot(
experiment,
experiment.phases.length - 1,
datasource
);
logger.info("Success");
await sendSignificanceEmail(experiment, lastSnapshot, currentSnapshot);
} catch (e) {
logger.error("Failure - " + e.message);
// If we failed to update the experiment, turn off auto-updating for the future
try {
experiment.autoSnapshots = false;
experiment.markModified("autoSnapshots");
await experiment.save();
// TODO: email user and let them know it failed
} catch (e) {
logger.error("Failed to turn off autoSnapshots - " + e.message);
}
}
}
async function sendSignificanceEmail(
experiment: ExperimentInterface,
lastSnapshot: ExperimentSnapshotDocument,
currentSnapshot: ExperimentSnapshotDocument
) {
const logger = parentLogger.child({
cron: "sendSignificanceEmail",
experimentId: experiment.id,
});
// If email is not configured, there's nothing else to do
if (!isEmailEnabled()) {
return;
}
try {
// get the org confidence level settings:
const { ciUpper, ciLower } = await getConfidenceLevelsForOrg(
experiment.organization
);
// check this and the previous snapshot to see if anything changed:
const experimentChanges: string[] = [];
for (let i = 1; i < currentSnapshot.results[0].variations.length; i++) {
const curVar = currentSnapshot.results[0].variations[i];
const lastVar = lastSnapshot.results[0].variations[i];
for (const m in curVar.metrics) {
// sanity checks:
if (
lastVar.metrics[m] &&
lastVar.metrics[m].chanceToWin &&
curVar.metrics[m].value > 150
) {
// checks to see if anything changed:
if (
curVar.metrics[m].chanceToWin > ciUpper &&
lastVar.metrics[m].chanceToWin < ciUpper
) {
// this test variation has gone significant, and won
experimentChanges.push(
"The metric " +
getMetricById(m) +
" for variation " +
experiment.variations[i].name +
" has reached a " +
(curVar.metrics[m].chanceToWin * 100).toFixed(1) +
"% chance to beat baseline"
);
} else if (
curVar.metrics[m].chanceToWin < ciLower &&
lastVar.metrics[m].chanceToWin > ciLower
) {
// this test variation has gone significant, and lost
experimentChanges.push(
"The metric " +
getMetricById(m) +
" for variation " +
experiment.variations[i].name +
" has dropped to a " +
(curVar.metrics[m].chanceToWin * 100).toFixed(1) +
" chance to beat the baseline"
);
}
}
}
}
if (experimentChanges.length) {
// send an email to any subscribers on this test:
logger.info(
"Significant change - detected " +
experimentChanges.length +
" significant changes"
);
const watchers = await getExperimentWatchers(experiment.id);
const userIds = watchers.map((w) => w.userId);
await sendExperimentChangesEmail(
userIds,
experiment.id,
experiment.name,
experimentChanges
);
}
} catch (e) {
logger.error(e.message);
}
}

View File

@@ -0,0 +1,101 @@
import Agenda, { Job } from "agenda";
import { WebhookModel } from "../models/WebhookModel";
import { createHmac } from "crypto";
import fetch from "node-fetch";
import { getExperimentOverrides } from "../services/organizations";
const WEBHOOK_JOB_NAME = "fireWebhook";
type WebhookJob = Job<{
webhookId: string;
retryCount: number;
}>;
let agenda: Agenda;
export default function (ag: Agenda) {
agenda = ag;
// Fire webhooks
agenda.define(WEBHOOK_JOB_NAME, async (job: WebhookJob) => {
const { webhookId } = job.attrs.data;
const webhook = await WebhookModel.findOne({
id: webhookId,
});
if (!webhook) return;
const overrides = await getExperimentOverrides(webhook.organization);
const payload = JSON.stringify({
timestamp: Math.floor(Date.now() / 1000),
overrides,
});
const signature = createHmac("sha256", webhook.signingKey)
.update(payload)
.digest("hex");
const res = await fetch(webhook.endpoint, {
headers: {
"Content-Type": "application/json",
"X-GrowthBook-Signature": signature,
},
method: "POST",
body: payload,
});
if (!res.ok) {
const e = "POST returned an invalid status code: " + res.status;
webhook.set("error", e);
await webhook.save();
throw new Error(e);
}
webhook.set("error", "");
webhook.set("lastSuccess", new Date());
await webhook.save();
});
agenda.on(
"fail:" + WEBHOOK_JOB_NAME,
async (error: Error, job: WebhookJob) => {
const retryCount = job.attrs.data.retryCount;
let nextRunAt = Date.now();
// Wait 30s after the first failure
if (retryCount === 0) {
nextRunAt += 30000;
}
// Wait 5m after the second failure
else if (retryCount === 1) {
nextRunAt += 300000;
}
// If it failed 3 times, give up
else {
// TODO: email the organization owner
return;
}
job.attrs.data.retryCount++;
job.attrs.nextRunAt = new Date(nextRunAt);
await job.save();
}
);
}
export async function queueWebhook(orgId: string) {
// Only queue if the organization has at least 1 webhook defined
const webhooks = await WebhookModel.find({
organization: orgId,
});
if (!webhooks) return;
for (let i = 0; i < webhooks.length; i++) {
const webhookId = webhooks[i].id as string;
const job = agenda.create(WEBHOOK_JOB_NAME, {
webhookId,
retryCount: 0,
}) as WebhookJob;
job.unique({ webhookId });
job.schedule(new Date());
await job.save();
}
}