defas_image_file(self)->"ImageFile":"""Convert the file to a `ImageFile` object."""ifisinstance(self,ImageFile):returnselffile=ImageFile(**self.model_dump())file._set_stream(self._catalog,caching_enabled=self._caching_enabled)returnfile
defas_text_file(self)->"TextFile":"""Convert the file to a `TextFile` object."""ifisinstance(self,TextFile):returnselffile=TextFile(**self.model_dump())file._set_stream(self._catalog,caching_enabled=self._caching_enabled)returnfile
defas_video_file(self)->"VideoFile":"""Convert the file to a `VideoFile` object."""ifisinstance(self,VideoFile):returnselffile=VideoFile(**self.model_dump())file._set_stream(self._catalog,caching_enabled=self._caching_enabled)returnfile
defexport(self,output:Union[str,os.PathLike[str]],placement:ExportPlacement="fullpath",use_cache:bool=True,link_type:Literal["copy","symlink"]="copy",client_config:Optional[dict]=None,)->None:"""Export file to new location."""self._caching_enabled=use_cachedst=self.get_destination_path(output,placement)dst_dir=os.path.dirname(dst)client:Client=self._catalog.get_client(dst_dir,**(client_configor{}))client.fs.makedirs(dst_dir,exist_ok=True)iflink_type=="symlink":try:returnself._symlink_to(dst)exceptOSErrorasexc:ifexc.errnonotin(errno.ENOTSUP,errno.EXDEV,errno.ENOSYS):raiseself.save(dst,client_config=client_config)
defget_destination_path(self,output:Union[str,os.PathLike[str]],placement:ExportPlacement)->str:""" Returns full destination path of a file for exporting to some output based on export placement """ifplacement=="filename":path=unquote(self.name)elifplacement=="etag":path=f"{self.etag}{self.get_file_suffix()}"elifplacement=="fullpath":path=unquote(self.get_full_name())source=urlparse(self.source)ifsource.schemeandsource.scheme!="file":path=posixpath.join(source.netloc,path)elifplacement=="checksum":raiseNotImplementedError("Checksum placement not implemented yet")else:raiseValueError(f"Unsupported file export placement: {placement}")returnposixpath.join(output,path)# type: ignore[union-attr]
defget_local_path(self)->Optional[str]:"""Return path to a file in a local cache. Returns None if file is not cached. Raises an exception if cache is not setup. """ifself._catalogisNone:raiseRuntimeError("cannot resolve local file path because catalog is not setup")returnself._catalog.cache.get_path(self)
@contextmanagerdefopen(self,mode:Literal["rb","r"]="rb")->Iterator[Any]:"""Open the file and return a file object."""ifself.location:withVFileRegistry.open(self,self.location)asf:# type: ignore[arg-type]yieldfelse:ifself._caching_enabled:self.ensure_cached()client:Client=self._catalog.get_client(self.source)withclient.open_object(self,use_cache=self._caching_enabled,cb=self._download_cb)asf:yieldio.TextIOWrapper(f)ifmode=="r"elsef
defread_text(self):"""Returns file contents as text."""ifself.location:raiseVFileError("Reading text from virtual file is not supported",self.source,self.path,)withself.open(mode="r")asstream:returnstream.read()
resolve
resolve()->Self
Resolve a File object by checking its existence and updating its metadata.
defresolve(self)->"Self":""" Resolve a File object by checking its existence and updating its metadata. Returns: File: The resolved File object with updated metadata. """ifself._catalogisNone:raiseRuntimeError("Cannot resolve file: catalog is not set")try:client=self._catalog.get_client(self.source)exceptNotImplementedErrorase:raiseRuntimeError(f"Unsupported protocol for file source: {self.source}")frometry:info=client.fs.info(client.get_full_path(self.path))converted_info=client.info_to_file(info,self.path)returntype(self)(path=self.path,source=self.source,size=converted_info.size,etag=converted_info.etag,version=converted_info.version,is_latest=converted_info.is_latest,last_modified=converted_info.last_modified,location=self.location,)except(FileNotFoundError,PermissionError,OSError)ase:logger.warning("File system error when resolving %s: %s",self.path,str(e))returntype(self)(path=self.path,source=self.source,size=0,etag="",version="",is_latest=True,last_modified=TIME_ZERO,location=self.location,)
defsave(self,destination:str,client_config:Optional[dict]=None):"""Writes it's content to destination"""destination=stringify_path(destination)client:Client=self._catalog.get_client(destination,**(client_configor{}))ifclient.PREFIX=="file://"andnotdestination.startswith(client.PREFIX):destination=Path(destination).absolute().as_uri()client.upload(self.read(),destination)
def__init__(self,message:str,source:str,path:str):self.message=messageself.source=sourceself.path=pathsuper().__init__(f"Error in file '{source}/{path}': {message}")
TarVFile
Bases: VFile
Virtual file model for files extracted from tar archives.
@classmethoddefopen(cls,file:"File",location:list[dict]):"""Stream file from tar archive based on location in archive."""tar_file=cls.parent(file,location)loc=location[0]if(offset:=loc.get("offset",None))isNone:raiseVFileError("'offset' is not specified",file.source,file.path)if(size:=loc.get("size",None))isNone:raiseVFileError("'size' is not specified",file.source,file.path)client=file._catalog.get_client(tar_file.source)fd=client.open_object(tar_file,use_cache=file._caching_enabled)returnFileSlice(fd,offset,size,file.name)