Skip to content

Plan: GH-0007 Add an automated worker to drain queued ingestion jobs

Context

The scraping pipeline (POST /scraping/{source}/sync) discovers PDFs, persists them, and queues ingestion jobs in ingestion_jobs. However, these jobs currently sit in the queued state forever until an admin manually triggers POST /admin/jobs/dispatch. This breaks the automated handoff.

Scope

  • Add a minimal worker loop inside the existing BackgroundTasks / APScheduler structure to automatically pick up queued jobs and execute them.
  • Preserve the existing _run_ingestion_job execution logic.
  • Prevent double-dispatch by updating the job status atomically.
  • Scope it strictly to ingestion jobs to keep risk low.

Acceptance Criteria

  • When a job is queued by the scraper, it is eventually picked up (e.g. within 30 seconds) by the backend without API intervention.
  • The same job cannot be picked up by two iterations of the loop simultaneously.
  • Manual dispatch remains functional as an override.
  • Observable logs trace job pick-up and execution.

Design

The project currently uses APScheduler in app/services/background_jobs.py. We will add a new job triggered by IntervalTrigger every 30 seconds. The job will: 1. Select the oldest queued job from the ingestion_jobs table. 2. Atomically update its status to parsing to claim it. 3. Run the existing _run_ingestion_job pipeline (which executes synchronously) inside an asyncio executor to prevent blocking the FastAPI event loop.