[backend] Discard jobs with null/undefined/empty data objects; add no-op handlers for invalid queue jobs

This stops corrupted/invalid jobs from clogging up the queue. Ref: https://github.com/OptimalBits/bull/issues/2461
This commit is contained in:
Laura Hausmann 2024-07-23 20:38:30 +02:00
parent a2eecc22d2
commit 0d5220e505
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
9 changed files with 24 additions and 0 deletions

View file

@ -1,4 +1,5 @@
import type Bull from "bull";
import { noop } from "@/queue/processors/noop.js";
const jobs = {} as Record<string, Bull.ProcessCallbackFunction<Record<string, unknown>>>;
@ -6,4 +7,6 @@ export default function (q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
q.process(noop);
}

View file

@ -16,6 +16,7 @@ import { importMastoPost } from "./import-masto-post.js";
import { importCkPost } from "./import-firefish-post.js";
import { importBlocking } from "./import-blocking.js";
import { importCustomEmojis } from "./import-custom-emojis.js";
import { noop } from "@/queue/processors/noop.js";
const jobs = {
deleteDriveFiles,
@ -44,4 +45,6 @@ export default function (dbQueue: Bull.Queue<DbJobData>) {
for (const [k, v] of Object.entries(jobs)) {
dbQueue.process(k, v);
}
dbQueue.process(noop);
}

View file

@ -20,6 +20,7 @@ const logger = new Logger("deliver");
let latest: string | null = null;
export default async (job: Bull.Job<DeliverJobData>) => {
if (job.data == null || Object.keys(job.data).length === 0) return "Skip (data was null or empty)";
const { host } = new URL(job.data.to);
const puny = toPuny(host);

View file

@ -11,6 +11,10 @@ export async function endedPollNotification(
job: Bull.Job<EndedPollNotificationJobData>,
done: any,
): Promise<void> {
if (job.data == null || Object.keys(job.data).length === 0) {
done();
return;
}
const note = await Notes.findOneBy({ id: job.data.noteId });
if (note == null || !note.hasPoll) {
done();

View file

@ -28,6 +28,7 @@ const logger = new Logger("inbox");
// Processing when an activity arrives in the user's inbox
export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
if (job.data == null || Object.keys(job.data).length === 0) return "Skip (data was null or empty)";
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;

View file

@ -0,0 +1,5 @@
import Bull from "bull";
// Processor to be registered for jobs with __default__ (unnamed) handlers in queues that only have named handlers
// Prevents sporadic bogus jobs from clogging up the queues
export async function noop(_: Bull.Job): Promise<void> { }

View file

@ -2,6 +2,7 @@ import type Bull from "bull";
import type { ObjectStorageJobData } from "@/queue/types.js";
import deleteFile from "./delete-file.js";
import cleanRemoteFiles from "./clean-remote-files.js";
import { noop } from "@/queue/processors/noop.js";
const jobs = {
deleteFile,
@ -16,4 +17,6 @@ export default function (q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
q.process(noop);
}

View file

@ -6,6 +6,7 @@ import { checkExpiredMutings } from "./check-expired-mutings.js";
import { clean } from "./clean.js";
import { setLocalEmojiSizes } from "./local-emoji-size.js";
import { verifyLinks } from "./verify-links.js";
import { noop } from "@/queue/processors/noop.js";
const jobs = {
tickCharts,
@ -25,4 +26,6 @@ export default function (dbQueue: Bull.Queue<Record<string, unknown>>) {
for (const [k, v] of Object.entries(jobs)) {
dbQueue.process(k, v);
}
dbQueue.process(noop);
}

View file

@ -9,6 +9,7 @@ import config from "@/config/index.js";
const logger = new Logger("webhook");
export default async (job: Bull.Job<WebhookDeliverJobData>) => {
if (job.data == null || Object.keys(job.data).length === 0) return "Skip (data was null or empty)";
try {
logger.debug(`delivering ${job.data.webhookId}`);