Engine Execution System
The Data Machine engine utilizes a four-action execution cycle (@since v0.8.0) that orchestrates all pipeline workflows through WordPress Action Scheduler. This system implements a Single Item Execution Model, processing exactly one item per job execution to ensure maximum reliability and isolation.
Execution Cycle
The engine follows a standardized cycle for both database-driven and ephemeral workflows:
datamachine_run_flow_now: Entry point for execution. Loads configurations and initializes the job.datamachine_execute_step: Performs the actual work of a single step (Fetch, AI, Publish, etc.).datamachine_schedule_next_step: Persists data packets and schedules the next step in the sequence.datamachine_run_flow_later: Handles scheduling logic, queuing the flow for future execution.
Tool execution
The pipeline engine executes step-specific tools through the AI tool system (tool discovery via filters and cached resolution in ToolManager).
Single Item Execution Model
At its core, the engine is designed for reliability-first processing. Instead of processing batches of items, which can lead to timeouts or cascading failures, the engine processes exactly one item per job execution cycle.
- Isolation: Each item is processed in its own job context. A failure in one item does not affect others.
- Reliability: Minimizes memory usage and execution time per step.
- Traceability: Every processed item is linked to a specific job and log history.
- Consistency: Steps (Fetch, AI, Publish) are built with the expectation of receiving and returning a single primary data packet.
1. Flow Initiation (datamachine_run_flow_now)
Purpose: entry point for immediate execution of a workflow.
Process:
- Context Setting: Sets
AgentContexttoPIPELINE. - Job Creation: Uses
JobManagerto create or retrieve a job record. - Configuration Loading: Loads
flow_configandpipeline_config. - Snapshotting: Stores an engine snapshot in
engine_datafor consistency throughout the job. - First Step Discovery: Identifies the step with
execution_order = 0. - Scheduling: Triggers
datamachine_schedule_next_stepfor the first step.
Usage:
do_action('datamachine_run_flow_now', $flow_id, $job_id);
2. Step Execution (datamachine_execute_step)
Purpose: Processes an individual step within the workflow.
Parameters:
$job_id(int) – Job identifier.$flow_step_id(string) – Specific step being executed.
Process:
- Data Retrieval: Loads data packets from
FilesRepositoryusing the job context. - Config Resolution: Retrieves step configuration from the engine snapshot.
- Step Dispatch: Instantiates the appropriate step class (e.g.,
AIStep) and callsexecute(). - Navigation: Uses
StepNavigatorto determine if a subsequent step exists. - Completion/Transition: If a next step exists, it calls
datamachine_schedule_next_step. Otherwise, it marks the job ascompletedand cleans up temporary files.
3. Step Scheduling (datamachine_schedule_next_step)
Purpose: Transitions between steps using Action Scheduler.
Parameters:
$job_id(int) – Job identifier.$flow_step_id(string) – Next step to execute.$dataPackets(array) – Content to pass to the next step.
Process:
- Data Persistence: Stores
$dataPacketsin theFilesRepositoryisolated by flow and job. - Background Queuing: Schedules
datamachine_execute_stepviaas_schedule_single_action()for immediate background processing.
4. Deferred Execution (datamachine_run_flow_later)
Purpose: Manages future or recurring execution logic via the Scheduling System.
Parameters:
$flow_id(int) – Flow to schedule.$interval_or_timestamp(string|int) – ‘manual’, a Unix timestamp, or a recurring interval key (e.g., ‘every_5_minutes’, ‘hourly’).
Process:
- Cleanup: Unscheduled any existing actions for the flow using
as_unschedule_action. - Manual Mode: Simply updates the database configuration without scheduling new actions.
- Timestamp Mode: Schedules a one-time
datamachine_run_flow_nowat the specific Unix timestamp usingas_schedule_single_action. - Interval Mode: Schedules recurring
datamachine_run_flow_nowactions usingas_schedule_recurring_action. - Database Sync: Updates the flow’s
scheduling_configin the database to reflect the new state.
Supported Intervals:
every_5_minuteshourlyevery_2_hoursevery_4_hoursqtrdaily(Every 6 hours)twicedaily(Every 12 hours)dailyweekly
Developers can add custom intervals via the datamachine_scheduler_intervals filter.
Ephemeral Workflows
The engine supports Ephemeral Workflows (@since v0.8.0)—workflows executed without being saved to the database. These are triggered via the /execute REST endpoint by passing a workflow object instead of a flow_id.
- Sentinel Values: Use
flow_id = 'direct'andpipeline_id = 'direct'. - Dynamic Config: Configurations are generated on-the-fly from the request and stored in the job’s
engine_datasnapshot. - Execution Flow: Once initialized, they follow the standard
execute_step→schedule_next_stepcycle.
Step Navigation
The engine uses StepNavigator (@since v0.2.1) to determine step transitions during execution:
use DataMachineEngineStepNavigator;
$step_navigator = new StepNavigator();
// Determine next step after current step completes
$next_flow_step_id = $step_navigator->get_next_flow_step_id($flow_step_id, [
'engine_data' => $engine_data
]);
if ($next_flow_step_id) {
// Schedule next step execution
do_action('datamachine_schedule_next_step', $job_id, $next_flow_step_id, $data);
} else {
// Pipeline complete
do_action('datamachine_update_job_status', $job_id, 'completed');
}
Benefits:
- Centralized step navigation logic
- Support for complex step ordering
- Rollback capability via
get_previous_flow_step_id() - Performance optimized via engine_data context
See: StepNavigator Documentation for complete details
Data Storage
Files Repository
Step data is persisted per-job using FilesRepository (FileStorage + FileRetrieval) under the datamachine-files uploads directory.
See FilesRepository for the current directory structure and component responsibilities.
Step Discovery
Step Registration
Steps register via datamachine_step_types filter:
add_filter('datamachine_step_types', function($steps) {
$steps['my_step'] = [
'name' => __('My Step'),
'class' => 'MyStep',
'position' => 50
];
return $steps;
});
Step Implementation
All steps implement the same payload contract:
class MyStep {
public function execute(array $payload): array {
$job_id = $payload['job_id'];
$flow_step_id = $payload['flow_step_id'];
$data = $payload['data'] ?? [];
// EngineData value object (not a raw array)
$engine = $payload['engine'];
array_unshift($data, [
'type' => 'my_step',
'content' => ['title' => $title, 'body' => $content],
'metadata' => ['source_type' => 'my_source'],
'timestamp' => time()
]);
return $data;
}
}
Parameter Passing
Unified Step Payload
Engine now delivers a documented payload array to every step:
$payload = [
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
'data' => $data,
'engine' => $engine_data
];
Benefits:
- ✅ Explicit Dependencies: Steps read everything from a single payload without relying on shared globals
- ✅ Consistent Evolvability: New metadata can be appended to the payload without changing method signatures
- ✅ Pure Testing: Steps are testable via simple array fixtures, enabling isolated unit tests
Step Implementation Pattern remains identical to the example above—extract what you need from $payload, process data, and return the updated packet.
Job Management
Job Status
pending– Created but not startedprocessing– Currently executingcompleted– Successfully finished (items processed)completed_no_items– Finished successfully but no new items are found to processagent_skipped– Finished intentionally without processing the current item (supports compound statuses likeagent_skipped - {reason})failed– Actual execution error occurred
Flow Monitoring & Problem Flows
The engine tracks execution metrics to identify "Problem Flows" that may require administrative attention:
- Metrics: Each flow tracks
consecutive_failuresandconsecutive_no_items. - Threshold: The
problem_flow_thresholdsetting (default: 3) determines when a flow is flagged. - Monitoring:
- REST API:
GET /datamachine/v1/flows/problemsreturns flagged flows. - AI Tools:
get_problem_flowsallows agents to identify and troubleshoot these flows.
- REST API:
- Reset: Metrics are reset upon the next successful
completedexecution.
See: Troubleshooting Problem Flows for detailed guidance.
Job Operations
Create Job:
$job_id = $db_jobs->create_job([
'pipeline_id' => $pipeline_id,
'flow_id' => $flow_id
]);
Update Status:
$job_manager = new DataMachineServicesJobManager();
$job_manager->updateStatus($job_id, 'completed', 'Pipeline executed successfully');
Fail Job:
$job_manager = new DataMachineServicesJobManager();
$job_manager->failJob($job_id, 'step_execution_failure', [
'flow_step_id' => $flow_step_id,
'reason' => 'detailed_error_reason'
]);
Error Handling
Exception Management
All step execution is wrapped in try-catch blocks:
try {
$data = $flow_step->execute($parameters);
return !empty($data); // Success = non-empty data packet
} catch (Throwable $e) {
$logs_manager = new DataMachineServicesLogsManager();
$logs_manager->log('error', 'Step execution failed', [
'exception' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
$job_manager = new DataMachineServicesJobManager();
$job_manager->failJob($job_id, 'step_execution_failure', $context);
return false;
}
Failure Actions
Job Failure:
- Updates job status to ‘failed’
- Logs detailed error information
- Optionally cleans up job data files
- Stops pipeline execution
Performance Considerations
Action Scheduler Integration
- Asynchronous Processing – Steps run in background via WP cron
- Immediate Scheduling –
time()for next step execution - Queue Management – WordPress handles scheduling and retry logic
Data Storage Optimization
- Reference-Based Passing – Large data stored in files, not database
- Automatic Cleanup – Completed jobs cleaned from storage
- Flow Isolation – Each flow maintains separate storage directory
Memory Management
- Minimal Data Retention – Only current step data in memory
- Garbage Collection – Automatic cleanup after completion