Implementation: Automated Ingestion Worker
Overview
This document records the implementation of Issue #7: adding an automated worker loop to drain queued ingestion jobs.
What Changed
app/services/ingestion.py:- Centralized the ingestion execution logic by moving
execute_joband_log_ingestion_stagefrom the admin router intoIngestionService. - Injected dependencies (
hybrid_ingestion_service,embedding_service,pinecone_adapter) into theIngestionServiceconstructor to avoid circular imports. app/core/dependencies.py:- Updated the
ingestion_serviceprovider to pass the newly required dependencies. app/api/routers/admin.py:- Removed ingestion execution logic. Now delegates to
ingestion_service.execute_job. app/services/background_jobs.py:- Added an asynchronous function
drain_queued_ingestion_jobs. - Configured
APSchedulerto run this function every 30 seconds. - Used an atomic
UPDATEcondition (status = 'queued') to prevent double-dispatch of the same job. - Handled the execution of the blocking
execute_jobviaasyncio.get_running_loop().run_in_executorso the main FastAPI event loop remains responsive. - Added robust logging to track worker pickups, claims, and errors.
Execution Flow
- The scheduler wakes up every 30 seconds.
- The
drain_queued_ingestion_jobsfunction queries Supabase for the oldest job withstatus = 'queued'. - If a job is found, it passes the
job_idandreference_idtoingestion_service.execute_jobrunning inside a thread-pool executor. IngestionService.execute_jobattempts to claim the job by atomically updating the status fromqueuedtoparsing. If the row is no longerqueued(e.g. claimed by another worker or a manual dispatch), it returns safely without error.- If claimed successfully,
IngestionServiceprocesses the ingestion (PDF download, parsing, chunking, embeddings).
Verification
- End-to-End: A new scrape run can be triggered via
POST /scraping/koutoubi/sync. The log will show the worker automatically picking up the generated queued job and processing it. - Double-dispatch prevention: Tested by manually enqueuing jobs and ensuring the atomic update prevents overlap. The
max_instances=1parameter in APScheduler also ensures the task does not overlap itself within the same process. - Manual override:
POST /admin/jobs/dispatchcontinues to work.
Remaining Risks
- Scaling beyond a single process: Since the worker relies on atomic SQL updates for locking, it is mostly safe against concurrent processes, but high concurrency might require a real message queue in the future.
- Prolonged job execution: Very long ingestion jobs (e.g., > 500 pages) could block thread pool workers. The thread pool size may need configuration if concurrency increases.