logo

Task Chaining

Task chaining is a workflow pattern where multiple asynchronous tasks execute in sequence — each starts only after the previous one completes, with the output of one becoming the input of the next. It’s the async equivalent of a synchronous pipeline.

How It Works

  1. The first task is created and executed
  2. When it completes, a webhook triggers the creation of the next task
  3. The result from the previous task is passed as payload to the next
  4. This continues until the final task completes

Example with AsyncQueue

// Start the pipeline
await aq.tasks.create({
  callbackUrl: 'https://your-app.com/api/step/extract',
  payload: { pipelineId, sourceUrl: 'https://...' },
  webhookUrl: 'https://your-app.com/api/pipeline-router',
});

// Route to the next step based on completion
app.post('/api/pipeline-router', async (req, res) => {
  const { pipelineId, step, result } = req.body.result;

  const nextSteps = {
    extract: 'transform',
    transform: 'load',
    load: null, // pipeline complete
  };

  const next = nextSteps[step];

  if (next) {
    await aq.tasks.create({
      callbackUrl: `https://your-app.com/api/step/${next}`,
      payload: { pipelineId, step: next, data: result },
      webhookUrl: 'https://your-app.com/api/pipeline-router',
    });
  } else {
    await markPipelineComplete(pipelineId);
  }

  res.status(200).json({ received: true });
});

When to Use Task Chaining

  • ETL pipelines: Extract → Transform → Load → Validate
  • Order processing: Validate → Charge → Fulfill → Notify
  • User onboarding: Verify identity → Credit check → Provision account
  • Content processing: Upload → Parse → Enrich → Index

Considerations

  • Log a pipeline ID and step number in every callback for end-to-end tracing
  • Store intermediate results so you can resume from the last successful step on failure
  • Set per-step timeouts and retries — different steps may have different reliability profiles