Delta Processing
Delta processing allows you to optimize the creation of new dataset versions by calculating and processing only the differences (deltas) between dataset versions. This is especially useful when working with large datasets that change incrementally over time.
Overview
When working with large datasets that receive regular updates, reprocessing the entire dataset each time is inefficient. The delta=True
flag in DataChain enables incremental processing, where only new or changed records are processed.
How Delta Processing Works
When delta processing is enabled:
- DataChain calculates the difference between the latest version of your source dataset and the version used to create the most recent version of your result dataset
- Only the records that are new or modified in the source dataset are processed
- These processed records are then merged with the latest version of your result dataset
This optimization significantly reduces processing time and resource usage for incremental updates.
Usage
Delta processing can be enabled through the delta
parameter when using read_storage()
or read_dataset()
:
import datachain as dc
# Process only new or changed files
chain = (
dc.read_storage(
"data/",
delta=True, # Enable delta processing
delta_on="file.path", # Field that uniquely identifies records
delta_compare="file.mtime" # Field to check for changes
)
.map(result=process_function)
.save(name="processed_data")
)
Parameters
- delta: Boolean flag to enable delta processing
- delta_on: Field(s) that uniquely identify rows in the source dataset
- delta_result_on: Field(s) in the resulting dataset that correspond to
delta_on
fields (if they have different names) - delta_compare: Field(s) used to check if a record has been modified
Example: Processing New Files Only
import datachain as dc
import time
def process_file(file):
"""Process a file and return results."""
content = file.read_text()
# Simulate processing time
time.sleep(0.1)
return {
"content": content,
"processed_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
# Process only new or modified files
chain = (
dc.read_storage(
"data/",
update=True, # Update the storage index
delta=True, # Process only new files
delta_on="file.path" # Files are identified by their paths
)
.map(result=process_file)
.save(name="processed_files")
)
print(f"Processed {chain.count()} files")
Combining with Retry Processing
Delta processing can be combined with retry processing to create a powerful workflow that both:
- Processes only new or changed records (delta)
- Reprocesses records with errors or that are missing (retry)