Deduplication Tracking Endpoints
Implementation: inc/Api/ProcessedItems.php
Base URL: /wp-json/datamachine/v1/processed-items
Overview
Deduplication tracking endpoints manage item tracking records to prevent duplicate processing of content items across flow executions. When a fetch handler processes an item (like an RSS post or Reddit comment), a record is stored to mark it as processed so future flow runs skip it.
Authentication
Requires manage_options capability. See Authentication Guide documentation.
Endpoints
GET /processed-items
Retrieve deduplication tracking records with pagination and filtering.
Permission: manage_options capability required
Purpose: Monitor what items have been processed to prevent duplicates, useful for debugging and workflow optimization
Parameters:
page(integer, optional): Page number for pagination (default: 1)per_page(integer, optional): Items per page (default: 20, max: 100)flow_id(integer, optional): Filter by specific flow ID
Example Requests:
# Get all processed items (paginated)
curl https://example.com/wp-json/datamachine/v1/processed-items
-u username:application_password
# Get processed items for specific flow
curl https://example.com/wp-json/datamachine/v1/processed-items?flow_id=42
-u username:application_password
# Get specific page
curl https://example.com/wp-json/datamachine/v1/processed-items?page=2&per_page=50
-u username:application_password
Success Response (200 OK):
{
"success": true,
"items": [
{
"id": 1523,
"flow_step_id": "step_uuid_42",
"source_type": "rss",
"item_identifier": "https://example.com/post-123",
"job_id": 789,
"processed_at": "2024-01-02 14:30:00"
},
{
"id": 1522,
"flow_step_id": "step_uuid_42",
"source_type": "rss",
"item_identifier": "https://example.com/post-122",
"job_id": 788,
"processed_at": "2024-01-02 14:00:00"
}
],
"total": 1523,
"page": 1,
"per_page": 20
}
Response Fields:
success(boolean): Request success statusitems(array): Array of deduplication tracking recordstotal(integer): Total number of tracked items matching filterspage(integer): Current page numberper_page(integer): Number of items per page
Tracked Item Fields:
id(integer): Unique processed item IDflow_step_id(string): Flow step identifier (format:{pipeline_step_id}_{flow_id})source_type(string): Handler type (e.g.,rss,reddit,wordpress-local)item_identifier(string): Unique identifier for the processed item (URL, post ID, etc.)job_id(integer): Associated job IDprocessed_at(string): Timestamp when item was processed
DELETE /processed-items/{id}
Delete a specific deduplication tracking record to allow reprocessing that item.
Permission: manage_options capability required
Purpose: Remove tracking for a specific item to force it to be processed again on next flow execution
Parameters:
id(integer, required): Processed item ID (in URL path)
Example Request:
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items/1523
-u username:application_password
Success Response (200 OK):
{
"success": true,
"message": "Deduplication tracking record deleted successfully.",
"id": 1523
}
Response Fields:
success(boolean): Request success statusmessage(string): Confirmation messageid(integer): Deleted tracking record ID
Error Response (404 Not Found):
{
"code": "processed_item_not_found",
"message": "Processed item not found.",
"data": {"status": 404}
}
DELETE /processed-items
Clear deduplication tracking records in bulk by pipeline or flow.
Permission: manage_options capability required
Purpose: Reset deduplication tracking to allow items to be processed again on next execution
Parameters:
clear_type(string, required): Clear scope –pipelineorflowtarget_id(integer, required): Pipeline ID or Flow ID depending on clear_type
Example Requests:
# Clear processed items for entire pipeline
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items
-H "Content-Type: application/json"
-u username:application_password
-d '{"clear_type": "pipeline", "target_id": 5}'
# Clear processed items for specific flow
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items
-H "Content-Type: application/json"
-u username:application_password
-d '{"clear_type": "flow", "target_id": 42}'
Success Response (200 OK):
{
"success": true,
"data": null,
"message": "Deleted 42 deduplication tracking records.",
"items_deleted": 42
}
Error Response (400 Bad Request):
{
"code": "invalid_clear_type",
"message": "Invalid clear type. Must be 'pipeline' or 'flow'.",
"data": {"status": 400}
}
Deduplication Tracking System
How It Works
- Fetch Handler: Records item identifier when fetching content from a source
- Database Storage: Stores
flow_step_id,source_type,item_identifier, andjob_id - Future Executions: Checks if an item was previously processed before fetching
- Skip Duplicates: Prevents reprocessing of the same item across flow runs
Item Identifiers by Handler
RSS Handler:
- Identifier: RSS item link URL
- Example:
https://example.com/post-123
Reddit Handler:
- Identifier: Reddit post ID
- Example:
t3_abc123
WordPress Local Handler:
- Identifier: WordPress post ID
- Example:
456
WordPress API Handler:
- Identifier: Post link URL
- Example:
https://external-site.com/post-789
WordPress Media Handler:
- Identifier: Attachment ID
- Example:
789
Google Sheets Handler:
- Identifier: Row index
- Example:
5
Flow Step ID Format
Processed items are tracked per flow step using composite ID:
{pipeline_step_id}_{flow_id}
Example: abc-123-def-456_42
This allows:
- Same pipeline step in different flows to maintain independent tracking
- Pipeline-wide clearing when pipeline is deleted
- Flow-specific clearing when flow is deleted
Common Workflows
Force Reprocessing of RSS Feed
# 1. Clear deduplication tracking for flow
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items
-H "Content-Type: application/json"
-u username:application_password
-d '{"clear_type": "flow", "target_id": 42}'
# 2. Execute flow again - items will be processed since tracking was cleared
curl -X POST https://example.com/wp-json/datamachine/v1/execute
-H "Content-Type: application/json"
-u username:application_password
-d '{"flow_id": 42}'
Debug Deduplication Behavior
# Check what items have been tracked as processed
curl https://example.com/wp-json/datamachine/v1/processed-items?flow_id=42&per_page=100
-u username:application_password
Reset Pipeline Tracking
# Clear all deduplication tracking for pipeline
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items
-H "Content-Type: application/json"
-u username:application_password
-d '{"clear_type": "pipeline", "target_id": 5}'
Integration Examples
Python Deduplication Management
import requests
from requests.auth import HTTPBasicAuth
url = "https://example.com/wp-json/datamachine/v1/processed-items"
auth = HTTPBasicAuth("username", "application_password")
# Get deduplication tracking records for flow
params = {"flow_id": 42, "per_page": 100}
response = requests.get(url, params=params, auth=auth)
if response.status_code == 200:
data = response.json()
print(f"Found {len(data['items'])} tracked items")
for item in data['items']:
print(f"Processed: {item['item_identifier']} at {item['processed_at']}")
# Clear deduplication tracking
clear_response = requests.delete(url, json={
"clear_type": "flow",
"target_id": 42
}, auth=auth)
if clear_response.status_code == 200:
print("Deduplication tracking cleared")
JavaScript Item Tracking Management
const axios = require('axios');
const deduplicationAPI = {
baseURL: 'https://example.com/wp-json/datamachine/v1/processed-items',
auth: {
username: 'admin',
password: 'application_password'
}
};
// Get tracked items count
async function getTrackedCount(flowId) {
const response = await axios.get(deduplicationAPI.baseURL, {
params: { flow_id: flowId, per_page: 1 },
auth: deduplicationAPI.auth
});
return response.data.total;
}
// Clear deduplication tracking by flow
async function clearFlowTracking(flowId) {
const response = await axios.delete(deduplicationAPI.baseURL, {
data: {
clear_type: 'flow',
target_id: flowId
},
auth: deduplicationAPI.auth
});
return response.data.success;
}
// Delete specific tracking record
async function deleteTrackingRecord(itemId) {
const response = await axios.delete(
`${deduplicationAPI.baseURL}/${itemId}`,
{ auth: deduplicationAPI.auth }
);
return response.data.success;
}
// Usage
const count = await getTrackedCount(42);
console.log(`Flow 42 has ${count} tracked items`);
await clearFlowTracking(42);
console.log('Flow deduplication tracking cleared');
Use Cases
Reprocess RSS Feed Items
Clear deduplication tracking to force re-import of previously skipped content:
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items
-H "Content-Type: application/json"
-u username:application_password
-d '{"clear_type": "flow", "target_id": 42}'
Debug Handler Configuration
After fixing handler configuration, reset deduplication tracking to reprocess items with new settings:
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items
-H "Content-Type: application/json"
-u username:application_password
-d '{"clear_type": "pipeline", "target_id": 5}'
Monitor Workflow Progress
Track how many items have been processed over time by checking deduplication records:
curl https://example.com/wp-json/datamachine/v1/processed-items?flow_id=42
-u username:application_password
Remove Specific Deduplication Record
Delete tracking for a specific item to allow it to be processed again:
curl -X DELETE https://example.com/wp-json/datamachine/v1/processed-items/1523
-u username:application_password
Abilities API Integration
Three registered abilities provide programmatic access to processed items operations. These are used by REST API endpoints, CLI commands, and Chat tools.
| Ability | Description |
|---|---|
datamachine/clear-processed-items |
Clear processed items by pipeline or flow scope |
datamachine/check-processed-item |
Check if a specific item has been processed for a flow step |
datamachine/has-processed-history |
Check if a flow step has any processing history |
has-processed-history
Distinguishes "no new items found" from "first run with an empty source." The has_processed_items() method on ProcessedItems checks whether any record exists for a given flow_step_id. The engine uses this to select between two completion statuses:
completed_no_items— The flow step has processed items before but the current run found nothing new. This is normal steady-state behavior.completed(empty) — The flow step has never processed anything. This indicates a first run that returned no data, which may signal a configuration issue.
SkipItemTool
Implementation: inc/Core/Steps/Fetch/Tools/SkipItemTool.php
Since: 0.9.7
The SkipItemTool is a handler tool available during the Fetch step that allows the AI agent to explicitly skip an item that does not meet processing criteria. It acts as a safety net when keyword exclusions or other filters miss items that should not be processed.
Behavior:
- Marks the item as processed via
datamachine_mark_item_processedaction so it is not refetched on subsequent runs - Sets
job_statusin engine_data toagent_skipped - {reason}viadatamachine_merge_engine_data() - The engine reads the
job_statusoverride at completion and applies it as the final job status
Parameters:
reason(string, required): Explanation of why the item is being skippedjob_id(integer, required): Current job IDengine(object, injected): ExecutionContext engine providingitem_id,source_type, andflow_step_id
Tool Result:
[
'success' => true,
'message' => 'Item skipped: {reason}',
'status' => 'agent_skipped - {reason}',
'item_id' => '{item_identifier}',
'tool_name' => 'skip_item',
]
ExecutionContext
Implementation: inc/Core/ExecutionContext.php
Since: 0.9.16
ExecutionContext bridges fetch handlers to the database layer by encapsulating execution mode, deduplication, engine data access, file storage, and logging into a single object.
Execution Modes:
flow— Standard flow-based execution with full pipeline/flow context and deduplication trackingdirect— Direct execution without database persistence (CLI tools, ephemeral workflows); deduplication is disabled and IDs are set to the sentinel value'direct'
Factory Methods:
ExecutionContext::fromFlow($pipeline_id, $flow_id, $flow_step_id, $job_id, $handler_type)— Standard flow executionExecutionContext::direct($handler_type)— Direct execution modeExecutionContext::fromConfig($config, $job_id, $handler_type)— Backward-compatible creation from handler config array
Key Methods for Processed Items:
isItemProcessed(string $item_id): bool— Checks deduplication viaProcessedItems::has_item_been_processed(). Returnsfalsein direct mode.markItemProcessed(string $item_id): void— Firesdatamachine_mark_item_processedaction. No-op in direct mode.storeEngineData(array $data): void— Merges data into engine snapshot for the current jobgetEngine(): EngineData— Lazily loads engine data for the current job
Related Documentation
- Execute Endpoint – Workflow execution
- Jobs Endpoints – Job monitoring
- Handlers Endpoint – Available handlers
- Authentication – Auth methods
Base URL: /wp-json/datamachine/v1/processed-items
Permission: manage_options capability required
Implementation: inc/Api/ProcessedItems.php
Database Table: wp_datamachine_processed_items
Abilities: DataMachineAbilitiesProcessedItemsAbilities