Skip to content

File

File and classes that inherit from it. File is a special DataModel that is generated automatically when creating a DataChain from files, like in DataChain.from_storage. File classes include various metadata fields about the underlying file as well as methods to read from the files and otherwise work with the file contents.

ArrowRow

Bases: DataModel

DataModel for reading row from Arrow-supported file.

open

open()

Stream row contents from indexed file.

Source code in datachain/lib/file.py
@contextmanager
def open(self):
    """Stream row contents from indexed file."""
    if self.file._caching_enabled:
        self.file.ensure_cached()
        path = self.file.get_local_path()
        ds = dataset(path, **self.kwargs)

    else:
        path = self.file.get_path()
        ds = dataset(path, filesystem=self.file.get_fs(), **self.kwargs)

    return ds.take([self.index]).to_reader()

read

read()

Returns row contents as dict.

Source code in datachain/lib/file.py
def read(self):
    """Returns row contents as dict."""
    with self.open() as record_batch:
        return record_batch.to_pylist()[0]

ExportPlacement module-attribute

ExportPlacement = Literal[
    "filename", "etag", "fullpath", "checksum"
]

File

File(**kwargs)

Bases: DataModel

DataModel for reading binary files.

Source code in datachain/lib/file.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self._catalog = None
    self._caching_enabled: bool = False

export

export(
    output: str,
    placement: ExportPlacement = "fullpath",
    use_cache: bool = True,
) -> None

Export file to new location.

Source code in datachain/lib/file.py
def export(
    self,
    output: str,
    placement: ExportPlacement = "fullpath",
    use_cache: bool = True,
) -> None:
    """Export file to new location."""
    if use_cache:
        self._caching_enabled = use_cache
    dst = self.get_destination_path(output, placement)
    dst_dir = os.path.dirname(dst)
    os.makedirs(dst_dir, exist_ok=True)

    self.save(dst)

get_destination_path

get_destination_path(
    output: str, placement: ExportPlacement
) -> str

Returns full destination path of a file for exporting to some output based on export placement

Source code in datachain/lib/file.py
def get_destination_path(self, output: str, placement: ExportPlacement) -> str:
    """
    Returns full destination path of a file for exporting to some output
    based on export placement
    """
    if placement == "filename":
        path = unquote(self.name)
    elif placement == "etag":
        path = f"{self.etag}{self.get_file_suffix()}"
    elif placement == "fullpath":
        path = unquote(self.get_full_name())
        source = urlparse(self.source)
        if source.scheme and source.scheme != "file":
            path = posixpath.join(source.netloc, path)
    elif placement == "checksum":
        raise NotImplementedError("Checksum placement not implemented yet")
    else:
        raise ValueError(f"Unsupported file export placement: {placement}")
    return posixpath.join(output, path)  # type: ignore[union-attr]

get_file_ext

get_file_ext()

Returns last part of file name without ..

Source code in datachain/lib/file.py
def get_file_ext(self):
    """Returns last part of file name without `.`."""
    return PurePosixPath(self.path).suffix.strip(".")

get_file_stem

get_file_stem()

Returns file name without extension.

Source code in datachain/lib/file.py
def get_file_stem(self):
    """Returns file name without extension."""
    return PurePosixPath(self.path).stem

get_file_suffix

get_file_suffix()

Returns last part of file name with ..

Source code in datachain/lib/file.py
def get_file_suffix(self):
    """Returns last part of file name with `.`."""
    return PurePosixPath(self.path).suffix

get_fs

get_fs()

Returns fsspec filesystem for the file.

Source code in datachain/lib/file.py
def get_fs(self):
    """Returns `fsspec` filesystem for the file."""
    return self._catalog.get_client(self.source).fs

get_full_name

get_full_name()

Returns name with parent directories.

Source code in datachain/lib/file.py
def get_full_name(self):
    """Returns name with parent directories."""
    return self.path

get_local_path

get_local_path() -> Optional[str]

Return path to a file in a local cache.

Returns None if file is not cached. Raises an exception if cache is not setup.

Source code in datachain/lib/file.py
def get_local_path(self) -> Optional[str]:
    """Return path to a file in a local cache.

    Returns None if file is not cached.
    Raises an exception if cache is not setup.
    """
    if self._catalog is None:
        raise RuntimeError(
            "cannot resolve local file path because catalog is not setup"
        )
    return self._catalog.cache.get_path(self)

get_path

get_path() -> str

Returns file path.

Source code in datachain/lib/file.py
def get_path(self) -> str:
    """Returns file path."""
    path = unquote(self.get_uri())
    source = urlparse(self.source)
    if source.scheme == "file":
        path = urlparse(path).path
        path = url2pathname(path)
    return path

get_uri

get_uri()

Returns file URI.

Source code in datachain/lib/file.py
def get_uri(self):
    """Returns file URI."""
    return f"{self.source}/{self.get_full_name()}"

open

open(mode: Literal['rb', 'r'] = 'rb') -> Iterator[Any]

Open the file and return a file object.

Source code in datachain/lib/file.py
@contextmanager
def open(self, mode: Literal["rb", "r"] = "rb") -> Iterator[Any]:
    """Open the file and return a file object."""
    if self.location:
        with VFileRegistry.resolve(self, self.location) as f:  # type: ignore[arg-type]
            yield f

    else:
        if self._caching_enabled:
            self.ensure_cached()
        client: Client = self._catalog.get_client(self.source)
        with client.open_object(
            self, use_cache=self._caching_enabled, cb=self._download_cb
        ) as f:
            yield io.TextIOWrapper(f) if mode == "r" else f

read

read(length: int = -1)

Returns file contents.

Source code in datachain/lib/file.py
def read(self, length: int = -1):
    """Returns file contents."""
    with self.open() as stream:
        return stream.read(length)

read_bytes

read_bytes()

Returns file contents as bytes.

Source code in datachain/lib/file.py
def read_bytes(self):
    """Returns file contents as bytes."""
    return self.read()

read_text

read_text()

Returns file contents as text.

Source code in datachain/lib/file.py
def read_text(self):
    """Returns file contents as text."""
    with self.open(mode="r") as stream:
        return stream.read()

resolve

resolve() -> Self

Resolve a File object by checking its existence and updating its metadata.

Returns:

  • File ( Self ) –

    The resolved File object with updated metadata.

Source code in datachain/lib/file.py
def resolve(self) -> "Self":
    """
    Resolve a File object by checking its existence and updating its metadata.

    Returns:
        File: The resolved File object with updated metadata.
    """
    if self._catalog is None:
        raise RuntimeError("Cannot resolve file: catalog is not set")

    try:
        client = self._catalog.get_client(self.source)
    except NotImplementedError as e:
        raise RuntimeError(
            f"Unsupported protocol for file source: {self.source}"
        ) from e

    try:
        info = client.fs.info(client.get_full_path(self.path))
        converted_info = client.info_to_file(info, self.source)
        return type(self)(
            path=self.path,
            source=self.source,
            size=converted_info.size,
            etag=converted_info.etag,
            version=converted_info.version,
            is_latest=converted_info.is_latest,
            last_modified=converted_info.last_modified,
            location=self.location,
        )
    except (FileNotFoundError, PermissionError, OSError) as e:
        logger.warning("File system error when resolving %s: %s", self.path, str(e))

    return type(self)(
        path=self.path,
        source=self.source,
        size=0,
        etag="",
        version="",
        is_latest=True,
        last_modified=TIME_ZERO,
        location=self.location,
    )

save

save(destination: str)

Writes it's content to destination

Source code in datachain/lib/file.py
def save(self, destination: str):
    """Writes it's content to destination"""
    with open(destination, mode="wb") as f:
        f.write(self.read())

FileError

FileError(file: File, message: str)

Bases: DataChainError

Source code in datachain/lib/file.py
def __init__(self, file: "File", message: str):
    super().__init__(f"Error in file {file.get_uri()}: {message}")

ImageFile

ImageFile(**kwargs)

Bases: File

DataModel for reading image files.

Source code in datachain/lib/file.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self._catalog = None
    self._caching_enabled: bool = False

read

read()

Returns PIL.Image.Image object.

Source code in datachain/lib/file.py
def read(self):
    """Returns `PIL.Image.Image` object."""
    fobj = super().read()
    return Image.open(BytesIO(fobj))

save

save(destination: str)

Writes it's content to destination

Source code in datachain/lib/file.py
def save(self, destination: str):
    """Writes it's content to destination"""
    self.read().save(destination)

TarVFile

Bases: VFile

Virtual file model for files extracted from tar archives.

open classmethod

open(file: File, location: list[dict])

Stream file from tar archive based on location in archive.

Source code in datachain/lib/file.py
@classmethod
def open(cls, file: "File", location: list[dict]):
    """Stream file from tar archive based on location in archive."""
    if len(location) > 1:
        VFileError(file, "multiple 'location's are not supported yet")

    loc = location[0]

    if (offset := loc.get("offset", None)) is None:
        VFileError(file, "'offset' is not specified")

    if (size := loc.get("size", None)) is None:
        VFileError(file, "'size' is not specified")

    if (parent := loc.get("parent", None)) is None:
        VFileError(file, "'parent' is not specified")

    tar_file = File(**parent)
    tar_file._set_stream(file._catalog)

    client = file._catalog.get_client(tar_file.source)
    fd = client.open_object(tar_file, use_cache=file._caching_enabled)
    return FileSlice(fd, offset, size, file.name)

TextFile

TextFile(**kwargs)

Bases: File

DataModel for reading text files.

Source code in datachain/lib/file.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self._catalog = None
    self._caching_enabled: bool = False

open

open(mode: Literal['rb', 'r'] = 'r')

Open the file and return a file object (default to text mode).

Source code in datachain/lib/file.py
@contextmanager
def open(self, mode: Literal["rb", "r"] = "r"):
    """Open the file and return a file object (default to text mode)."""
    with super().open(mode=mode) as stream:
        yield stream

read_text

read_text()

Returns file contents as text.

Source code in datachain/lib/file.py
def read_text(self):
    """Returns file contents as text."""
    with self.open() as stream:
        return stream.read()

save

save(destination: str)

Writes it's content to destination

Source code in datachain/lib/file.py
def save(self, destination: str):
    """Writes it's content to destination"""
    with open(destination, mode="w") as f:
        f.write(self.read_text())