Skip to content

File

File is a special DataModel, which is automatically generated when a DataChain is created from files, such as in DataChain.from_storage:

from datachain import DataChain

dc = DataChain.from_storage("gs://datachain-demo/dogs-and-cats")
dc.print_schema()

Output:

file: File@v1
    source: str
    path: str
    size: int
    version: str
    etag: str
    is_latest: bool
    last_modified: datetime
    location: Union[dict, list[dict], NoneType]

File classes include various metadata fields describing the underlying file, along with methods to read and manipulate file contents.

File

File(**kwargs)

Bases: DataModel

DataModel for reading binary files.

Attributes:

  • source (str) โ€“

    The source of the file (e.g., 's3://bucket-name/').

  • path (str) โ€“

    The path to the file (e.g., 'path/to/file.txt').

  • size (int) โ€“

    The size of the file in bytes. Defaults to 0.

  • version (str) โ€“

    The version of the file. Defaults to an empty string.

  • etag (str) โ€“

    The ETag of the file. Defaults to an empty string.

  • is_latest (bool) โ€“

    Whether the file is the latest version. Defaults to True.

  • last_modified (datetime) โ€“

    The last modified timestamp of the file. Defaults to Unix epoch (1970-01-01T00:00:00).

  • location (dict | list[dict]) โ€“

    The location of the file. Defaults to None.

Source code in datachain/lib/file.py
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self._catalog = None
    self._caching_enabled: bool = False

export

export(
    output: str,
    placement: ExportPlacement = "fullpath",
    use_cache: bool = True,
    link_type: Literal["copy", "symlink"] = "copy",
) -> None

Export file to new location.

Source code in datachain/lib/file.py
def export(
    self,
    output: str,
    placement: ExportPlacement = "fullpath",
    use_cache: bool = True,
    link_type: Literal["copy", "symlink"] = "copy",
) -> None:
    """Export file to new location."""
    if use_cache:
        self._caching_enabled = use_cache
    dst = self.get_destination_path(output, placement)
    dst_dir = os.path.dirname(dst)
    client: Client = self._catalog.get_client(dst_dir)
    client.fs.makedirs(dst_dir, exist_ok=True)

    if link_type == "symlink":
        try:
            return self._symlink_to(dst)
        except OSError as exc:
            if exc.errno not in (errno.ENOTSUP, errno.EXDEV, errno.ENOSYS):
                raise

    self.save(dst)

get_destination_path

get_destination_path(
    output: str, placement: ExportPlacement
) -> str

Returns full destination path of a file for exporting to some output based on export placement

Source code in datachain/lib/file.py
def get_destination_path(self, output: str, placement: ExportPlacement) -> str:
    """
    Returns full destination path of a file for exporting to some output
    based on export placement
    """
    if placement == "filename":
        path = unquote(self.name)
    elif placement == "etag":
        path = f"{self.etag}{self.get_file_suffix()}"
    elif placement == "fullpath":
        path = unquote(self.get_full_name())
        source = urlparse(self.source)
        if source.scheme and source.scheme != "file":
            path = posixpath.join(source.netloc, path)
    elif placement == "checksum":
        raise NotImplementedError("Checksum placement not implemented yet")
    else:
        raise ValueError(f"Unsupported file export placement: {placement}")
    return posixpath.join(output, path)  # type: ignore[union-attr]

get_file_ext

get_file_ext()

Returns last part of file name without ..

Source code in datachain/lib/file.py
def get_file_ext(self):
    """Returns last part of file name without `.`."""
    return PurePosixPath(self.path).suffix.strip(".")

get_file_stem

get_file_stem()

Returns file name without extension.

Source code in datachain/lib/file.py
def get_file_stem(self):
    """Returns file name without extension."""
    return PurePosixPath(self.path).stem

get_file_suffix

get_file_suffix()

Returns last part of file name with ..

Source code in datachain/lib/file.py
def get_file_suffix(self):
    """Returns last part of file name with `.`."""
    return PurePosixPath(self.path).suffix

get_fs

get_fs()

Returns fsspec filesystem for the file.

Source code in datachain/lib/file.py
def get_fs(self):
    """Returns `fsspec` filesystem for the file."""
    return self._catalog.get_client(self.source).fs

get_full_name

get_full_name()

Returns name with parent directories.

Source code in datachain/lib/file.py
def get_full_name(self):
    """Returns name with parent directories."""
    return self.path

get_local_path

get_local_path() -> Optional[str]

Return path to a file in a local cache.

Returns None if file is not cached. Raises an exception if cache is not setup.

Source code in datachain/lib/file.py
def get_local_path(self) -> Optional[str]:
    """Return path to a file in a local cache.

    Returns None if file is not cached.
    Raises an exception if cache is not setup.
    """
    if self._catalog is None:
        raise RuntimeError(
            "cannot resolve local file path because catalog is not setup"
        )
    return self._catalog.cache.get_path(self)

get_path

get_path() -> str

Returns file path.

Source code in datachain/lib/file.py
def get_path(self) -> str:
    """Returns file path."""
    path = unquote(self.get_uri())
    source = urlparse(self.source)
    if source.scheme == "file":
        path = urlparse(path).path
        path = url2pathname(path)
    return path

get_uri

get_uri()

Returns file URI.

Source code in datachain/lib/file.py
def get_uri(self):
    """Returns file URI."""
    return f"{self.source}/{self.get_full_name()}"

open

open(mode: Literal['rb', 'r'] = 'rb') -> Iterator[Any]

Open the file and return a file object.

Source code in datachain/lib/file.py
@contextmanager
def open(self, mode: Literal["rb", "r"] = "rb") -> Iterator[Any]:
    """Open the file and return a file object."""
    if self.location:
        with VFileRegistry.resolve(self, self.location) as f:  # type: ignore[arg-type]
            yield f

    else:
        if self._caching_enabled:
            self.ensure_cached()
        client: Client = self._catalog.get_client(self.source)
        with client.open_object(
            self, use_cache=self._caching_enabled, cb=self._download_cb
        ) as f:
            yield io.TextIOWrapper(f) if mode == "r" else f

read

read(length: int = -1)

Returns file contents.

Source code in datachain/lib/file.py
def read(self, length: int = -1):
    """Returns file contents."""
    with self.open() as stream:
        return stream.read(length)

read_bytes

read_bytes()

Returns file contents as bytes.

Source code in datachain/lib/file.py
def read_bytes(self):
    """Returns file contents as bytes."""
    return self.read()

read_text

read_text()

Returns file contents as text.

Source code in datachain/lib/file.py
def read_text(self):
    """Returns file contents as text."""
    with self.open(mode="r") as stream:
        return stream.read()

resolve

resolve() -> Self

Resolve a File object by checking its existence and updating its metadata.

Returns:

  • File ( Self ) โ€“

    The resolved File object with updated metadata.

Source code in datachain/lib/file.py
def resolve(self) -> "Self":
    """
    Resolve a File object by checking its existence and updating its metadata.

    Returns:
        File: The resolved File object with updated metadata.
    """
    if self._catalog is None:
        raise RuntimeError("Cannot resolve file: catalog is not set")

    try:
        client = self._catalog.get_client(self.source)
    except NotImplementedError as e:
        raise RuntimeError(
            f"Unsupported protocol for file source: {self.source}"
        ) from e

    try:
        info = client.fs.info(client.get_full_path(self.path))
        converted_info = client.info_to_file(info, self.path)
        return type(self)(
            path=self.path,
            source=self.source,
            size=converted_info.size,
            etag=converted_info.etag,
            version=converted_info.version,
            is_latest=converted_info.is_latest,
            last_modified=converted_info.last_modified,
            location=self.location,
        )
    except (FileNotFoundError, PermissionError, OSError) as e:
        logger.warning("File system error when resolving %s: %s", self.path, str(e))

    return type(self)(
        path=self.path,
        source=self.source,
        size=0,
        etag="",
        version="",
        is_latest=True,
        last_modified=TIME_ZERO,
        location=self.location,
    )

save

save(destination: str)

Writes it's content to destination

Source code in datachain/lib/file.py
def save(self, destination: str):
    """Writes it's content to destination"""
    destination = stringify_path(destination)
    client: Client = self._catalog.get_client(str(destination))
    client.upload(self.read(), str(destination))

FileError

FileError(file: File, message: str)

Bases: DataChainError

Source code in datachain/lib/file.py
def __init__(self, file: "File", message: str):
    super().__init__(f"Error in file {file.get_uri()}: {message}")

TarVFile

Bases: VFile

Virtual file model for files extracted from tar archives.

open classmethod

open(file: File, location: list[dict])

Stream file from tar archive based on location in archive.

Source code in datachain/lib/file.py
@classmethod
def open(cls, file: "File", location: list[dict]):
    """Stream file from tar archive based on location in archive."""
    if len(location) > 1:
        raise VFileError(file, "multiple 'location's are not supported yet")

    loc = location[0]

    if (offset := loc.get("offset", None)) is None:
        raise VFileError(file, "'offset' is not specified")

    if (size := loc.get("size", None)) is None:
        raise VFileError(file, "'size' is not specified")

    if (parent := loc.get("parent", None)) is None:
        raise VFileError(file, "'parent' is not specified")

    tar_file = File(**parent)
    tar_file._set_stream(file._catalog)

    client = file._catalog.get_client(tar_file.source)
    fd = client.open_object(tar_file, use_cache=file._caching_enabled)
    return FileSlice(fd, offset, size, file.name)