Job Scheduler
Async job execution engine that manages module runs, checkpoints, and lifecycle.
JobContext
Context object passed to running modules providing logging and state management.
Properties
job_id
@property
def job_id(self) -> strCurrent job identifier.
is_cancelled
@property
def is_cancelled(self) -> boolWhether the job has been cancelled via stop request or duration limit.
Methods
cancel
def cancel(self) -> NoneMark the job as cancelled. Sets is_cancelled to True.
log
async def log(self, message: str, level: str = "INFO") -> NoneLog a message for this job. Logs are persisted and broadcast to WebSocket subscribers.
save_result
async def save_result(self, data: dict[str, Any]) -> NoneSave a result from this job to the database.
save_checkpoint
async def save_checkpoint(self, checkpoint: dict[str, Any]) -> NoneSave checkpoint state for resume capability after restart.
JobScheduler
Main scheduler class managing job execution.
Properties
uptime
@property
def uptime(self) -> strHuman-readable uptime string (e.g., "2h 15m").
Methods
start_job
async def start_job(
self,
module_id: str,
inputs: dict[str, Any],
settings: Optional[dict[str, Any]] = None,
checkpoint: Optional[dict[str, Any]] = None,
max_duration: Optional[str] = None,
) -> strStart a new job.
Parameters:
module_id— Module to runinputs— Input parameterssettings— Module settingscheckpoint— Checkpoint to resume frommax_duration— Duration limit (e.g.,1h,24h,7d)
Returns: Job ID
resume_job
async def resume_job(
self,
job_id: str,
module_id: str,
inputs: dict[str, Any],
settings: dict[str, Any],
checkpoint: Optional[dict[str, Any]],
max_duration: Optional[str] = None,
) -> NoneResume a previously running job from its last checkpoint.
stop_job
async def stop_job(self, job_id: str) -> boolStop a running job. Returns True if job was found and stopped.
get_status
async def get_status(self) -> dictGet scheduler status including uptime, module count, and running jobs.
subscribe_logs
def subscribe_logs(self, job_id: str) -> asyncio.QueueSubscribe to real-time log updates for a job.
unsubscribe_logs
def unsubscribe_logs(self, job_id: str, queue: asyncio.Queue) -> NoneUnsubscribe from log updates.
resume_interrupted_jobs
async def resume_interrupted_jobs(self) -> intResume jobs that were interrupted by daemon restart. Called on startup.
Returns: Number of jobs resumed.
Job Lifecycle
- pending — Job created, not yet started
- running — Job executing
- completed — Job finished successfully
- failed — Job encountered an error
- stopped — Job cancelled by user or duration limit
Duration Watchdog
When max_duration is set, a watchdog task monitors the job:
- Waits for the specified duration
- Logs "Duration limit reached"
- Cancels the job context
- Job transitions to
stoppedstatus