Scheduling & Jobs
Author: Nils Jansen
Overview
The scheduling system manages periodic execution of background jobs using:
- Python
schedulelibrary for basic scheduling - Custom
JobSchedulerwrapper for enhanced functionality - Runs as a background thread checking for pending jobs every minute
Key features:
- Supports both process-based (CPU-bound) and thread-based (I/O-bound) execution
- Automatic timeout handling
- Comprehensive error tracking and notifications
- Database logging of all job runs
Adding New Jobs
Step 1: Define Job Function
def my_job(stop_event: multiprocessing.synchronize.Event) -> ScraperResult:
"""
Job functions must:
- Accept stop_event parameter
- Regularly check stop_event.is_set()
- Return ScraperResult
"""
try:
# Job implementation
if stop_event.is_set():
return ScraperResult(success=False, error=Exception("Job stopped"))
# ... job logic ...
return ScraperResult(success=True)
except Exception as e:
return ScraperResult(success=False, error=e)
Step 2: Register Job
Register in setup_scheduled_jobs() (app/core/jobs.py):
def setup_scheduled_jobs():
scheduler.register(
name="my_job", # Unique name
func=my_job, # Job function
job_schedule=schedule.every().day.at("03:00"), # Absolute schedule
run_in_process=True, # For CPU-intensive jobs
timeout_minutes=30 # Custom timeout (default: 15)
)
Important Configurations
| Parameter | Description | Example |
|---|---|---|
name | Unique job identifier | "scrape_mep_meetings" |
func | Job function reference | scrape_mep_meetings |
job_schedule | Schedule pattern | schedule.every().monday.at("02:00") |
run_in_process | Run in separate process (CPU-bound) | True |
timeout_minutes | Maximum runtime in minutes | 30 |
Important: Always use absolute schedule values (e.g., "every day at 3:00") rather than relative values ("every two weeks") since server restarts will reset the schedule library's memory.
Timeout Handling
The system provides two timeout mechanisms:
-
Process-based jobs:
- Terminated after timeout
- Results logged as timeout failure
-
Thread-based jobs:
stop_eventis set- Job should check
stop_event.is_set()periodically - System waits for graceful exit
Example timeout check:
if stop_event.is_set():
return ScraperResult(success=False, error=Exception("Job stopped"))
Error Handling
The system provides comprehensive error handling:
-
Exception Capture:
- All exceptions are caught and logged
- Error details stored in database
-
Notifications:
- Email alerts via
notify_job_failure - Includes job name and error details
- Email alerts via
-
Result Tracking:
- Success/failure status recorded
- Error messages stored
- For scrapers: lines added/processed
Database Logging
All job runs are logged to scheduled_job_runs table with:
- Job name
- Timestamp
- Success status
- Error message (if any)
- Runtime metrics
Manual Job Execution
Jobs can be executed manually by sending a POST request to /scheduler/run/{job_name} where {job_name} is the unique identifier of the registered job.
Authentication: This endpoint requires a valid JWT by default. During development, authentication can be disabled by setting the environment variable DISABLE_AUTH=true.
Best Practices
-
Job Design:
- Implement periodic
stop_eventchecks - Return meaningful
ScraperResult - Handle cleanup in finally blocks
- Implement periodic
-
Scheduling:
- Use absolute schedule times
- Space out CPU-intensive jobs
- Consider job dependencies
-
Timeouts:
- Set reasonable timeouts
- Longer for complex jobs
- Shorter for frequent jobs
-
Error Handling:
- Provide descriptive error messages
- Include context in ScraperResult
- Test failure scenarios
Example Jobs
See app/core/jobs.py for complete examples:
- Simple Job (send_weekly_newsletter)
- Process-based Job (scrape_mep_meetings)
- Complex Job (send_smart_alerts)
- Timeout Handling (clean_up_embeddings)