Core Actions Reference
Comprehensive reference for all WordPress actions used by Data Machine for pipeline execution, data processing, and system operations.
Note: Since v0.4.0, most core operations use the Services Layer architecture (DataMachineServices) for direct method calls, replacing filter-based actions for 3x performance improvement. These actions remain primarily for extensibility and backward compatibility.
Service Manager Integration
Direct service method calls are preferred over actions for system operations:
PipelineManager->create(),delete(),duplicate()FlowManager->create(),delete(),duplicate()JobManager->updateStatus(),failJob()LogsManager->log(),get_logs(),clear_logs()
Pipeline Execution Actions
datamachine_run_flow_now
Purpose: Entry point for all pipeline execution
Parameters:
$flow_id(int) – Flow ID to execute$context(string, optional) – Execution context (‘manual’, ‘scheduled’, etc.)
Usage:
do_action('datamachine_run_flow_now', $flow_id, 'manual');
Process:
- Retrieves flow data from database
- Creates job record for tracking
- Identifies first step (execution_order = 0)
- Schedules initial step execution
datamachine_execute_step
Purpose: Core step execution orchestration
Parameters:
$job_id(string) – Job identifier$flow_step_id(string) – Flow step identifier$dataPackets(array|null) – Data packets or storage reference
Usage:
do_action('datamachine_execute_step', $job_id, $flow_step_id, $dataPackets);
Internal Process:
- Retrieves data from storage if reference provided
- Loads flow step configuration
- Instantiates and executes step class
- Schedules next step or completes pipeline
datamachine_schedule_next_step
Purpose: Action Scheduler integration for step transitions
Parameters:
$job_id(string) – Job identifier$flow_step_id(string) – Next step to execute$dataPackets(array) – Data packets to pass
Usage:
do_action('datamachine_schedule_next_step', $job_id, $next_flow_step_id, $dataPackets);
Process:
- Stores data packet in files repository
- Creates Action Scheduler task
- Schedules immediate execution
Data Processing Actions
datamachine_mark_item_processed
Purpose: Mark items as processed for deduplication
Parameters:
$flow_step_id(string) – Flow step identifier$source_type(string) – Handler source type$item_id(mixed) – Item identifier$job_id(string) – Job identifier
Usage:
do_action('datamachine_mark_item_processed', $flow_step_id, 'wordpress_local', $post_id, $job_id);
Database Operation:
- Inserts record into
wp_datamachine_processed_itemstable - Creates unique constraint on flow_step_id + source_type + item_identifier
Job Management Actions
datamachine_update_job_status
Purpose: Update job execution status
Services Integration: Primarily handled by JobManager::updateStatus() since v0.4.0
Parameters:
$job_id(string) – Job identifier$status(string) – New status (‘pending’, ‘processing’, ‘completed’, ‘failed’, ‘completed_no_items’, ‘agent_skipped’ or compoundagent_skipped - {reason})$message(string, optional) – Status message
Usage:
// Services Layer (recommended since v0.4.0)
$job_manager = new DataMachineServicesJobManager();
$job_manager->updateStatus($job_id, 'completed', 'Pipeline executed successfully');
// Action Hook (for extensibility)
do_action('datamachine_update_job_status', $job_id, 'completed', 'Pipeline executed successfully');
datamachine_fail_job
Purpose: Mark job as failed with detailed error information
Services Integration: Primarily handled by JobManager::failJob() since v0.4.0
Parameters:
$job_id(string) – Job identifier$reason(string) – Failure reason category$context_data(array) – Additional failure context
Usage:
// Services Layer (recommended since v0.4.0)
$job_manager = new DataMachineServicesJobManager();
$job_manager->failJob($job_id, 'step_execution_failure', [
'flow_step_id' => $flow_step_id,
'exception_message' => $e->getMessage(),
'reason' => 'detailed_error_reason'
]);
// Action Hook (for extensibility)
do_action('datamachine_fail_job', $job_id, 'step_execution_failure', [
'flow_step_id' => $flow_step_id,
'exception_message' => $e->getMessage(),
'reason' => 'detailed_error_reason'
]);
Process:
- Updates job status to ‘failed’
- Logs detailed error information
- Optionally cleans up job data files
- Stops pipeline execution
Configuration Actions
datamachine_update_system_prompt
Purpose: Update pipeline-level system prompts
Parameters:
$pipeline_step_id(string) – Pipeline step UUID$system_prompt(string) – AI system prompt text
Usage:
do_action('datamachine_update_system_prompt', $pipeline_step_id, $system_prompt);
Storage: Stored in pipeline_config as reusable template
datamachine_update_flow_user_message
Purpose: Update flow-level user messages
Parameters:
$flow_step_id(string) – Flow step composite ID$user_message(string) – AI user message text
Usage:
do_action('datamachine_update_flow_user_message', $flow_step_id, $user_message);
Storage: Stored in flow_config for instance-specific customization
datamachine_save_tool_config
Purpose: Save tool configuration data
Parameters:
$tool_id(string) – Tool identifier$config_data(array) – Configuration data to save
Usage:
do_action('datamachine_save_tool_config', 'google_search', [
'api_key' => $api_key,
'search_engine_id' => $search_engine_id
]);
System Maintenance Actions
datamachine_cleanup_old_files
Purpose: File repository maintenance
Usage:
do_action('datamachine_cleanup_old_files');
Process:
- Removes data packets from completed jobs
- Cleans up orphaned files
- Runs via Action Scheduler
Import/Export Actions
datamachine_import
Purpose: Import pipeline or flow data
Parameters:
$type(string) – Import type (‘pipelines’, ‘flows’)$data(array) – Import data
Usage:
do_action('datamachine_import', 'pipelines', $csv_data);
datamachine_export
Purpose: Export pipeline or flow data
Parameters:
$type(string) – Export type (‘pipelines’, ‘flows’)$ids(array) – IDs to export
Usage:
do_action('datamachine_export', 'pipelines', [$pipeline_id]);
Logging Action
datamachine_log
Purpose: Centralized logging for all system operations
Parameters:
$level(string) – Log level (‘debug’, ‘info’, ‘warning’, ‘error’)$message(string) – Log message$context(array) – Additional context data
Usage:
do_action('datamachine_log', 'debug', 'AI Step Directive: Injected system directive', [
'tool_count' => count($tools),
'available_tools' => array_keys($tools),
'directive_length' => strlen($directive)
]);
Log Levels:
- debug – Development and troubleshooting information
- info – General operational information
- warning – Non-critical issues that should be noted
- error – Critical errors that affect functionality
REST Endpoints
GET /datamachine/v1/status
Purpose: Consolidated status refresh for flows and pipelines
Handler Class: DataMachineApiStatus
Query Parameters:
flow_id(int|string|array) – One or more flow IDs (supportsflow_id[]=1&flow_id[]=2or comma-separated string)pipeline_id(int|string|array) – One or more pipeline IDs (supportspipeline_id[]=3&pipeline_id[]=4or comma-separated string)
Response:
{
"success": true,
"requested": {
"flows": [123],
"pipelines": [456]
},
"flows": {
"123": {
"step_statuses": {
"flow_step_id_1": "green",
"flow_step_id_2": "yellow"
}
}
},
"pipelines": {
"456": {
"step_statuses": {
"pipeline_step_id_1": "green",
"pipeline_step_id_2": "red"
}
}
}
}
Use Cases:
- Flow handler configuration changes
- Pipeline template modifications
- Batch polling for multiple flows/pipelines in UI dashboards
Status Contexts:
flow_step_statusfor flow-scoped validation (handler configuration, scheduling, settings)pipeline_step_statusfor pipeline-wide validation (template architecture, AI cascade effects)
Security:
- Requires
manage_optionscapability - Requires REST nonce (
X-WP-Nonce) when called from authenticated admin JavaScript
Action Scheduler Integration
Action Types
Primary Actions:
datamachine_execute_step– Step executiondatamachine_cleanup_old_files– Maintenance- Custom actions for scheduled flows
Queue Management:
- Group:
data-machine - Immediate scheduling:
time()timestamp - WordPress cron integration
Usage Pattern
// Schedule action
$action_id = as_schedule_single_action(
time(), // Immediate execution
'datamachine_execute_step',
[
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
'data' => $data_reference
],
'data-machine'
);
Error Handling Actions
Exception Handling Pattern
try {
// Step execution
$payload = [
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
'flow_step_config' => $config,
'data' => $data,
'engine_data' => apply_filters('datamachine_engine_data', [], $job_id)
];
$result = $step->execute($payload);
if (!empty($result)) {
// Success - schedule next step
do_action('datamachine_schedule_next_step', $job_id, $next_step_id, $result);
} else {
// Failure - fail job
do_action('datamachine_fail_job', $job_id, 'empty_result', $context);
}
} catch (Throwable $e) {
// Exception - log and fail
do_action('datamachine_log', 'error', 'Step execution exception', [
'exception' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
do_action('datamachine_fail_job', $job_id, 'exception', $context);
}
Logging Integration
All actions integrate with centralized logging:
do_action('datamachine_log', 'info', 'Flow execution started successfully', [
'flow_id' => $flow_id,
'job_id' => $job_id,
'first_step' => $first_flow_step_id
]);
Security Considerations
Capability Requirements
All actions require manage_options capability:
if (!current_user_can('manage_options')) {
wp_die(__('You do not have sufficient permissions to access this page.'));
}
Data Sanitization
Input data is sanitized before processing:
$pipeline_name = sanitize_text_field($data['pipeline_name'] ?? '');
$config_json = wp_json_encode($config_data);