-
Notifications
You must be signed in to change notification settings - Fork 591
[Bug]: FlowProducer.add() can fail silently #3851
Description
Version
v5.70.1
Platform
NodeJS
What happened?
FlowProducer.add() will not throw under certain conditions where Queue.add() would normally throw. One example is when Redis is READONLY. No checks are performed on the result of multi.exec() here. I believe it's the same for .addBulk().
I came across this while debugging a fairly rare issue we've been experiencing in production where a FlowProducer parent queue would get stuck in a "zombie" state after a connection loss to Redis. Afterwards jobs seem to be added successfully but are nowhere to be found in the queues. I have not been able to reproduce it but I believe it could be an edge case with flow producers, Upstash and their update procedure which involves an extremely brief READONLY period. I think it might be related to this which is why I'm posting this issue.
How to reproduce.
import { ChildProcess, spawn } from 'child_process'
import { Redis } from 'ioredis'
import { FlowProducer, Queue } from 'bullmq'
const connectionOpts = {
host: '127.0.0.1',
port: 6379,
enableOfflineQueue: false,
}
let redisProcess: ChildProcess
async function wait(ms: number) { return new Promise(r => setTimeout(r, ms)) }
function startRedis() {
console.log('--- Starting Redis Server ---')
redisProcess = spawn('redis-server', [], { stdio: 'ignore' })
}
async function runRepro() {
startRedis()
await wait(500)
const setupConn = new Redis(connectionOpts)
await wait(500)
console.log('Making Redis READONLY')
await setupConn.replicaof(connectionOpts.host, connectionOpts.port)
await setupConn.quit()
const queueName = 'test-queue'
const flowProducer = new FlowProducer({ connection: connectionOpts })
const queue = new Queue(queueName, { connection: connectionOpts })
await flowProducer.waitUntilReady()
await queue.waitUntilReady()
console.log('Adding job with Queue...')
try {
const job = await queue.add('test-job', { foo: 'bar' })
console.error('Job added with Queue unexpectedly', job)
} catch (e) {
console.log('Got expected error while adding job with Queue')
}
console.log('Adding jobs with FlowProducer...')
try {
const jobNode = await flowProducer.add({
name: 'test-parent',
queueName,
children: [{ name: 'test-child', queueName }]
})
console.error('Job added with FlowProducer unexpectedly:', jobNode.job.id)
const verifyJobNode = await flowProducer.getFlow({ id: jobNode.job.id!, queueName })
console.error('.add() did not throw but flow does not exist:', verifyJobNode)
} catch (e) {
console.log('Got expected error while adding jobs with FlowProducer', e)
}
if (redisProcess) redisProcess.kill()
process.exit()
}
runRepro().catch(console.error)Relevant log output
--- Starting Redis Server ---
Making Redis READONLY
Adding job with Queue...
Got expected error while adding job with Queue
Adding jobs with FlowProducer...
Job added with FlowProducer unexpectedly: 6d82aace-3808-4750-8040-f9feb5149023
.add() did not throw but flow does not exist: undefinedCode of Conduct
- I agree to follow this project's Code of Conduct