Skip to content

Python Operations

Python operations run your Python functions on each record in a chain. Use them when you need file content, ML models, or LLM calls. For everything else, use data operations.

Three Types

Mapper: map()

Runs a Python function once per record. Returns data added to the same record.

import datachain as dc
from pydantic import BaseModel

class Analysis(BaseModel):
    sentiment: str
    confidence: float
    topics: list[str]

def analyze(file: dc.File, client) -> Analysis:
    resp = client.messages.create(model="claude-sonnet-4-20250514", ...)
    return Analysis.model_validate_json(resp.content[0].text)

Generator: gen()

Expands one record into multiple records. One video produces thousands of frames, one PDF yields many chunks.

import datachain as dc
from pydantic import BaseModel
from typing import Iterator

class Chunk(BaseModel):
    text: str
    embeddings: list[float]

def split_pdf(file: dc.File) -> Iterator[Chunk]:
    for page in extract_pages(file):
        yield Chunk(text=page.text, embeddings=embed(page.text))

chain = dc.read_storage("s3://docs/*.pdf").gen(chunk=split_pdf).save("chunks")

Aggregator: agg()

Groups records by a key, then produces output records per group.

import datachain as dc

def group_messages(text, sender) -> Iterator[Dialog]:
    yield Dialog(text="\n".join(text))

chain = dc.read_csv("s3://logs.csv").agg(dialog=group_messages, partition_by="session_id")

Column Naming

The new column name is the keyword assigned in the call:

chain = chain.map(embedding=clip_image_embedding)

Here, embedding becomes the new column name.

Auto-Inference

DataChain infers parameters from the function signature and output types from return type hints. The input column is always file regardless of modality.

Always annotate function return types. Missing return type annotations default to str and crash at runtime. This is the single most common source of production failures.

# GOOD: explicit return type
def classify(file: dc.File) -> str:
    return "positive"

# BAD: no return type -- defaults to str, crashes with non-string returns
def classify(file):
    return 0.95  # runtime error

Use params= only for nested column binding:

chain.map(ext=lambda path: path.rsplit(".", 1)[-1], params=["file.path"])

Setup for Expensive Resources

When a Python function needs an expensive resource (model, tokenizer, API client), .setup() initializes it once per worker:

from transformers import Pipeline, pipeline
import datachain as dc

def caption_image(file: dc.File, pipeline: Pipeline) -> str:
    image = file.read().convert("RGB")
    return pipeline(image)[0]["generated_text"]

chain = (
    dc.read_storage("gs://bucket/images/", type="image", anon=True)
    .limit(5)
    .settings(cache=True)
    .setup(pipeline=lambda: pipeline("image-to-text", model="Salesforce/blip-image-captioning-large"))
    .map(caption=caption_image)
)

The keyword argument name in .setup() must match the parameter name in the function. The lambda runs once per worker at startup.

Works the same for LLM API clients:

import google.generativeai as genai
import datachain as dc

def classify(file: dc.File, model: genai.GenerativeModel) -> str:
    text = file.read_text()
    response = model.generate_content(f"Classify this document: {text}")
    return response.text

chain = (
    dc.read_storage("s3://docs/")
    .setup(model=lambda: genai.GenerativeModel("gemini-2.0-flash"))
    .settings(parallel=4)
    .map(category=classify)
)

Class-Based Lifecycle

When .setup() is not enough (you need teardown(), shared mutable state across records, or multi-resource initialization), use the Mapper, Generator, and Aggregator base classes:

from datachain.lib.udf import Mapper  # internal module name

class ImageEncoder(Mapper):
    def setup(self):
        self.model = load_model("ViT-B-32")

    def process(self, file):
        return self.model.encode(file.read())

    def teardown(self):
        del self.model

Use class-based operations sparingly; .setup() covers most cases.

Execution and Scale

By default, execution is single-threaded. Add .settings(parallel=True) for expensive Python functions:

import datachain as dc

chain = (
    dc.read_storage("s3://mybucket/dir1")
    .settings(parallel=8, workers=60)  # 8 threads on 60 machines
    .map(emb=image_to_embedding)
    .save("img_emb")
)

One Signal Per Operation

Each map(), gen(), or agg() call produces exactly one output signal. Group multiple outputs in a Pydantic model:

from pydantic import BaseModel
import datachain as dc

class Result(BaseModel):
    label: str
    confidence: float
    tokens: int

# GOOD: one Pydantic model
chain.map(result=classify)

# BAD: multiple outputs per call
# chain.map(label=..., confidence=..., tokens=...)  # not supported

Complete Examples

Image Captioning

Read images from storage, load a HuggingFace model once per worker, caption each image, save as a versioned dataset:

from transformers import pipeline
import datachain as dc

chain = (
    dc.read_storage("s3://bucket/images/", type="image")
    .settings(parallel=8, cache=True)
    .setup(pipe=lambda: pipeline("image-to-text", model="Salesforce/blip-image-captioning-large"))
    .map(caption=lambda file, pipe: pipe(file.read().convert("RGB"))[0]["generated_text"])
    .save("image_captions")
)

LLM Classification

Classify dialog transcripts using an LLM API with parallel workers:

import os
from mistralai import Mistral
import datachain as dc

PROMPT = "Was this dialog successful? Answer in a single word: Success or Failure."

def eval_dialogue(file: dc.File) -> bool:
    client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
    response = client.chat.complete(
        model="open-mixtral-8x22b",
        messages=[{"role": "system", "content": PROMPT},
                  {"role": "user", "content": file.read()}])
    result = response.choices[0].message.content
    return result.lower().startswith("success")

chain = (
    dc.read_storage("gs://datachain-demo/chatbot-KiT/", column="file", anon=True)
    .settings(parallel=4)
    .map(is_success=eval_dialogue)
    .save("mistral_evaluations")
)

Serializing Full LLM Responses

Save the entire LLM response object as a typed column. Nested fields become queryable at warehouse speed:

from mistralai.models import ChatCompletionResponse
import datachain as dc

def eval_dialog(file: dc.File) -> ChatCompletionResponse:
    client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
    return client.chat.complete(
        model="open-mixtral-8x22b",
        messages=[{"role": "system", "content": PROMPT},
                  {"role": "user", "content": file.read()}])

chain = (
    dc.read_storage("gs://datachain-demo/chatbot-KiT/", column="file", anon=True)
    .settings(parallel=4, cache=True)
    .map(response=eval_dialog)
    .map(status=lambda response: response.choices[0].message.content.lower()[:7])
    .save("llm_responses")
)

# Query nested fields without deserialization
dc.read_dataset("llm_responses").select("file.path", "status", "response.usage").show(5)

Structured Output Validation

Use Pydantic to validate LLM outputs before they enter the dataset:

import datachain as dc
from pydantic import BaseModel

class Analysis(BaseModel):
    sentiment: str
    confidence: float
    topics: list[str]

def analyze(file: dc.File, client) -> Analysis:
    resp = client.messages.create(model="claude-sonnet-4-20250514", ...)
    return Analysis.model_validate_json(resp.content[0].text)

chain = (
    dc.read_storage("s3://docs/")
    .setup(client=lambda: create_anthropic_client())
    .settings(parallel=4)
    .map(analysis=analyze)
    .save("document_analyses")
)