Module Class
Base class for all Sweatpants automation modules.
Import
python
from sweatpants import ModuleOverview
Modules must subclass Module and implement the run() method as an async generator that yields results.
python
from sweatpants import Module
class MyModule(Module):
async def run(self, inputs, settings):
for item in items:
await self.log(f"Processing {item}")
result = await self.process(item)
yield resultProperties
job_id
python
@property
def job_id(self) -> strGet the current job ID.
is_cancelled
python
@property
def is_cancelled(self) -> boolCheck if the job has been cancelled. Use this to exit gracefully when the user stops a job.
python
async def run(self, inputs, settings):
for item in items:
if self.is_cancelled:
await self.log("Job cancelled, stopping...")
return
yield await self.process(item)Methods
log
python
async def log(self, message: str, level: str = "INFO") -> NoneLog a message for this job. Logs are stored in the database and can be streamed via WebSocket.
Parameters:
message— The log messagelevel— Log level:INFO,WARNING, orERROR
python
await self.log("Starting processing...")
await self.log("Rate limited, retrying...", level="WARNING")
await self.log("Failed to connect", level="ERROR")save_checkpoint
python
async def save_checkpoint(self, **data: Any) -> NoneSave checkpoint data for resume capability. Call periodically to enable job recovery after daemon restart.
Parameters:
**data— Key-value pairs to save as checkpoint
python
await self.save_checkpoint(
progress=50,
last_page=3,
processed_ids=["a", "b", "c"]
)get_checkpoint
python
def get_checkpoint(self, key: str, default: Any = None) -> AnyGet a value from the checkpoint.
python
last_page = self.get_checkpoint("last_page", default=1)restore_checkpoint
python
def restore_checkpoint(self, checkpoint: Optional[dict[str, Any]]) -> NoneCalled automatically by the scheduler when resuming a job. Updates internal checkpoint state.
Abstract Method
run
python
@abstractmethod
async def run(
self,
inputs: dict[str, Any],
settings: dict[str, Any],
) -> AsyncIterator[dict[str, Any]]Execute the module’s main task. Must be implemented as an async generator.
Parameters:
inputs— User-provided inputs for this jobsettings— Module configuration settings
Yields:
dict— Result data to be stored
python
async def run(self, inputs, settings):
query = inputs["query"]
api_key = settings.get("api_key")
results = await self.fetch_results(query, api_key)
for result in results:
yield {"data": result}