Skip to content

Implementation: Automated Ingestion Worker

Overview

This document records the implementation of Issue #7: adding an automated worker loop to drain queued ingestion jobs.

What Changed

  • app/services/ingestion.py:
  • Centralized the ingestion execution logic by moving execute_job and _log_ingestion_stage from the admin router into IngestionService.
  • Injected dependencies (hybrid_ingestion_service, embedding_service, pinecone_adapter) into the IngestionService constructor to avoid circular imports.
  • app/core/dependencies.py:
  • Updated the ingestion_service provider to pass the newly required dependencies.
  • app/api/routers/admin.py:
  • Removed ingestion execution logic. Now delegates to ingestion_service.execute_job.
  • app/services/background_jobs.py:
  • Added an asynchronous function drain_queued_ingestion_jobs.
  • Configured APScheduler to run this function every 30 seconds.
  • Used an atomic UPDATE condition (status = 'queued') to prevent double-dispatch of the same job.
  • Handled the execution of the blocking execute_job via asyncio.get_running_loop().run_in_executor so the main FastAPI event loop remains responsive.
  • Added robust logging to track worker pickups, claims, and errors.

Execution Flow

  1. The scheduler wakes up every 30 seconds.
  2. The drain_queued_ingestion_jobs function queries Supabase for the oldest job with status = 'queued'.
  3. If a job is found, it passes the job_id and reference_id to ingestion_service.execute_job running inside a thread-pool executor.
  4. IngestionService.execute_job attempts to claim the job by atomically updating the status from queued to parsing. If the row is no longer queued (e.g. claimed by another worker or a manual dispatch), it returns safely without error.
  5. If claimed successfully, IngestionService processes the ingestion (PDF download, parsing, chunking, embeddings).

Verification

  • End-to-End: A new scrape run can be triggered via POST /scraping/koutoubi/sync. The log will show the worker automatically picking up the generated queued job and processing it.
  • Double-dispatch prevention: Tested by manually enqueuing jobs and ensuring the atomic update prevents overlap. The max_instances=1 parameter in APScheduler also ensures the task does not overlap itself within the same process.
  • Manual override: POST /admin/jobs/dispatch continues to work.

Remaining Risks

  • Scaling beyond a single process: Since the worker relies on atomic SQL updates for locking, it is mostly safe against concurrent processes, but high concurrency might require a real message queue in the future.
  • Prolonged job execution: Very long ingestion jobs (e.g., > 500 pages) could block thread pool workers. The thread pool size may need configuration if concurrency increases.