Fetch Handlers Overview
Fetch handlers retrieve content from various sources and convert it into standardized DataPackets for pipeline processing.
Available Handlers
Local Sources
WordPress Local (wordpress)
- Purpose: Fetch posts/pages from local WordPress installation
- Authentication: None (uses WordPress database)
- Data Source: Local WP_Query results
- Key Features: Taxonomy filtering, timeframe limits, specific post targeting
WordPress Media (wordpress_media)
- Purpose: Fetch media files from local WordPress media library with parent post integration
- Authentication: None (uses WordPress media functions)
- Data Source: Media library attachments with optional parent content
- Key Features: Parent post content inclusion, file type filtering, metadata extraction, clean content generation
Files (files)
- Purpose: Process local and remote files
- Authentication: None
- Data Source: File system or URLs
- Key Features: Flow-isolated storage, multiple file format support
External Sources
Universal Web Scraper (universal_web_scraper) (@since v0.8.0)
- Purpose: High-performance, multi-extractor web content retrieval for events and structured data. See detailed documentation.
- Authentication: Varies by site (Public API, API Key, or None)
- Data Source: External websites via specialized extractors (AEG/AXS, Squarespace, Wix, JSON-LD, etc.)
- Key Features: Consolidates multiple legacy handlers into a unified architecture. Replaces standalone
GoogleCalendarandWordPressEventsAPIhandlers. Supports multi-layered extraction with AI fallback and automated pagination.
RSS (rss)
- Purpose: Fetch content from RSS/Atom feeds
- Authentication: None
- Data Source: XML feed parsing
- Key Features: Automatic deduplication, feed validation
Reddit (reddit)
- Purpose: Fetch posts from Reddit subreddits
- Authentication: OAuth2 (client_id, client_secret)
- Data Source: Reddit API
- Key Features: Subreddit filtering, comment retrieval
Google Sheets (googlesheets_fetch)
- Purpose: Extract data from Google Sheets
- Authentication: OAuth2 (client_id, client_secret)
- Data Source: Google Sheets API
- Key Features: Specific cell/range access, structured data extraction
WordPress API (wordpress_api)
- Purpose: Fetch content from external WordPress sites
- Authentication: None (public REST API)
- Data Source: WordPress REST API endpoints
- Key Features: Remote site access, structured data retrieval
Common Interface
Base Class Architecture (@since v0.2.1):
All fetch handlers extend FetchHandler base class (/inc/Core/Steps/Fetch/Handlers/FetchHandler.php) which provides:
- Deduplication checking via
isItemProcessed()andmarkItemProcessed() - Engine data storage via
storeEngineData() - Remote file downloading via
downloadRemoteFile() - Timeframe filtering via
applyTimeframeFilter() - Keyword search filtering via
applyKeywordSearch() - Standardized logging via
log() - Response formatting via
successResponse()andemptyResponse()
get_fetch_data() Method
All fetch handlers implement the same public interface:
public function get_fetch_data(int|string $pipeline_id, array $handler_config, ?string $job_id = null): array
Internally, the base class creates an ExecutionContext and calls:
abstract protected function executeFetch(array $config, ExecutionContext $context): array
Parameters:
$pipeline_id– Pipeline ID or'direct'for direct execution mode$handler_config– Handler-specific configuration (must includeflow_id,flow_step_id,pipeline_id)$job_id– Job identifier for deduplication tracking and engine data storage
ExecutionContext provides:
$context->getPipelineId()– Returnsint|string(numeric ID or'direct')$context->getFlowId()– Returnsint|string(numeric ID or'direct')$context->getJobId()– Returns?string$context->isItemProcessed($id)– Check deduplication (always false in direct mode)$context->markItemProcessed($id)– Mark item processed (no-op in direct mode)$context->log($level, $message, $extra)– Contextual logging$context->storeEngineData($data)– Store data for downstream steps$context->getFileContext()– Get file storage context array$context->isDirect()– Check if running in direct execution mode
Return: Array of DataPacket objects
Clean DataPacket Format (AI-visible)
[
'data' => [
'content_string' => "Source: Site NamennTitle: Content TitlennContent body...",
'file_info' => null // or file information array
],
'metadata' => [
'source_type' => 'handler_name',
'item_identifier_to_log' => 'unique_id',
'original_id' => 'source_id',
'original_title' => 'Content Title',
'original_date_gmt' => '2023-01-01 12:00:00'
// URLs stored separately in engine data
]
]
Engine Data (Database Storage)
Fetch handlers store engine parameters in database for centralized access via datamachine_engine_data filter:
// Stored by fetch handlers via centralized filter (array storage)
if ($job_id) {
apply_filters('datamachine_engine_data', null, $job_id, [
'source_url' => $source_url,
'image_url' => $image_url
]);
}
// Retrieved by handlers via centralized filter
$engine_data = apply_filters('datamachine_engine_data', [], $job_id);
$source_url = $engine_data['source_url'] ?? null;
$image_url = $engine_data['image_url'] ?? null;
Deduplication System
Processed Items Tracking
All fetch handlers use the processed items system:
// Check if already processed
$is_processed = apply_filters('datamachine_is_item_processed', false, $flow_step_id, $source_type, $item_id);
if (!$is_processed) {
// Mark as processed
do_action('datamachine_mark_item_processed', $flow_step_id, $source_type, $item_id, $job_id);
// Process item
return [$data_packet];
}
Scope: Per-flow-step deduplication (same item can be processed by different flows)
Persistence: Stored in wp_datamachine_processed_items table
Cleanup: Automatic cleanup with job completion
Source Type Identifiers
wordpress_local– Local WordPress postswordpress_media– Local media filesfiles– File processingrss– RSS feed itemsreddit– Reddit postsgoogle_sheets– Spreadsheet datawordpress_api– External WordPress API contentuniversal_web_scraper– Universal web scraper items
Configuration Patterns
Handler Config Structure
$handler_config = [
'flow_step_id' => $flow_step_id, // Added by engine
'handler_specific_key' => [
'setting1' => 'value1',
'setting2' => 'value2'
]
];
Common Settings
Timeframe Filtering:
all_time– No time restriction24_hours,72_hours,7_days,30_days– Recent content
Content Selection:
- Specific ID targeting
- Random vs. chronological selection
- Search/keyword filtering
Authentication:
- OAuth credentials stored separately
- Handler-specific auth configuration
- Automatic token refresh
Error Handling
Configuration Errors
Missing Required Settings:
if (empty($required_setting)) {
do_action('datamachine_log', 'error', 'Handler: Missing required setting', [
'handler' => 'handler_name',
'setting' => 'required_setting'
]);
return ['processed_items' => []];
}
API Errors
External Service Failures:
- Network timeouts logged and handled gracefully
- Authentication failures return empty results
- Rate limiting respected with appropriate delays
Data Processing Errors
Malformed Content:
- Invalid data structures handled gracefully
- Missing required fields use fallback values
- Logging provides detailed error context
Performance Considerations
Single Item Execution Model
All fetch handlers follow the system-wide Single Item Execution Model:
- Finds first eligible unprocessed item.
- Marks as processed immediately to prevent concurrency issues.
- Returns exactly one DataPacket (or an empty array if no new content exists).
- Ensures job-level isolation and reliability.
Memory Optimization
- Minimal data structures in memory
- Stream processing for large files
- Immediate garbage collection after processing
API Efficiency
- Minimal API calls per execution
- Efficient queries with proper filtering
- Connection pooling where available
Integration with AI Steps
Pipeline Context Integration
Fetch handlers provide essential metadata that AI steps use for content processing and tool execution:
Source URL Storage: WordPress Local, WordPress API, and WordPress Media handlers store source_url in database via centralized datamachine_engine_data filter enabling both publish handlers (link attribution) and update handlers (post identification) to access URLs.
Content Structure: All handlers structure content in consistent format that AI steps process through the modular AI directive system.
Metadata Preservation: Handler metadata (original titles, dates) flows through pipeline to AI tools via ToolParameters while URLs are accessed via engine data filter.
Tool-First Architecture Support
Fetch handlers seamlessly integrate with the tool-first AI architecture using centralized engine data storage:
// Fetch stores engine data via centralized filter (separate from AI data, array storage)
if ($job_id) {
apply_filters('datamachine_engine_data', null, $job_id, [
'source_url' => $source_url,
'image_url' => $image_url
]);
}
// AI step processes clean content without URL pollution
// Update tools access source_url via centralized datamachine_engine_data filter
// Publishing tools receive clean content via ToolParameters
Integration Examples
Basic Fetch Handler Usage
$wordpress_handler = new WordPress();
$result = $wordpress_handler->get_fetch_data(
$pipeline_id,
['wordpress_posts' => ['post_type' => 'post']],
$job_id
);
// Returns: ['processed_items' => [...]]
// Engine data stored separately in database via centralized datamachine_engine_data filter
With Deduplication
foreach ($potential_items as $item) {
$is_processed = apply_filters('datamachine_is_item_processed', false,
$flow_step_id, 'my_source', $item['id']);
if (!$is_processed) {
do_action('datamachine_mark_item_processed', $flow_step_id,
'my_source', $item['id'], $job_id);
return [$this->create_data_packet($item)];
}
}
return ['processed_items' => []]; // No unprocessed items
Extension Development
Custom Fetch Handler
class CustomFetchHandler {
public function get_fetch_data(int $pipeline_id, array $handler_config, ?string $job_id = null): array {
// Extract configuration
$config = $handler_config['custom_source'] ?? [];
$flow_step_id = $handler_config['flow_step_id'] ?? null;
// Fetch data from source
$items = $this->fetch_from_source($config);
// Process first unprocessed item
foreach ($items as $item) {
if ($flow_step_id) {
$is_processed = apply_filters('datamachine_is_item_processed', false,
$flow_step_id, 'custom_source', $item['id']);
if ($is_processed) continue;
do_action('datamachine_mark_item_processed', $flow_step_id,
'custom_source', $item['id'], $job_id);
}
// Store engine data via centralized filter (array storage)
if ($job_id) {
apply_filters('datamachine_engine_data', null, $job_id, [
'source_url' => $item['url'],
'image_url' => $item['image'] ?? ''
]);
}
return ['processed_items' => [$this->create_data_packet($item)]];
}
return ['processed_items' => []];
}
}