Source code for pyxet.file_system

import json
import sys
from urllib.parse import urlparse
from enum import IntEnum

import fsspec
import os

from .commit_transaction import MultiCommitTransaction
from .file_interface import XetFile
from .url_parsing import parse_url, XetPathInfo

if 'SPHINX_BUILD' not in os.environ:
    from .rpyxet import rpyxet
    _manager = rpyxet.PyRepoManager()

[docs] def login(user, token, email=None, host=None): """ Sets the active login credentials used to authenticate against Xethub. """ _manager.override_login_config(user, token, email, host)
[docs] def open(file_url, mode="rb", **kwargs): """ Open the file at the specific Xet file URL of the form `xet://<repo_user>/<repo_name>/<branch>/<path-to-file>`:: f = pyxet.open('xet://XetHub/Flickr30k/main/results.csv') """ fs = XetFS() return fs._open(file_url, mode=mode, **kwargs)
class XetFSOpenFlags(IntEnum): FILE_FLAG_NO_BUFFERING = 0x20000000
[docs] class XetFS(fsspec.spec.AbstractFileSystem): protocol = "xet" # This allows pandas, etc. to implement "xet://" cachable = True sep = "/" async_impl = False root_marker = "/"
[docs] def __init__(self, domain=None, **storage_options): """ Opens the repository at `repo_url` as an fsspec file system handle, providing read-only operations such as ls, glob, and open. User and token are needed for private repositories and they can be set with `pyxet.login`. Examples:: import pyxet fs = pyxet.XetFS() # List files. fs.ls('XetHub/Flickr30k/main') # Read the first 5 lines of a file b = fs.open('XetHub/Flickr30k/main/results.csv').read() the Xet repository endpoint can be set with the 'domain' argument or the XET_ENDPOINT environment variable. The default domain is xethub.com if unspecified """ import os if 'XET_ENDPOINT' in os.environ: domain = os.environ['XET_ENDPOINT'] if domain is None: # Read it from the config domain = 'xethub.com' self.domain = domain self.intrans = False self._transaction = None # Init the base class. super().__init__()
@classmethod def _strip_protocol(cls, path): """Turn path from fully-qualified to file-system-specific May require FS-specific handling, e.g., for relative paths or links. """ if isinstance(path, list): return [cls._strip_protocol(p) for p in path] if path.startswith('xet://'): protostripped = path[5:] elif path.startswith('https://'): protostripped = path[8:] else: protostripped = path return protostripped.lstrip('/')
[docs] def get_username(self): """ Returns the inferred username for the domain """ return _manager.get_inferred_username(self.domain)
def unstrip_protocol(self, name): """Format FS-specific path to generic, including protocol""" return 'xet://' + name.lstrip('/') @staticmethod def _get_kwargs_from_urls(path): """If kwargs can be encoded in the paths, extract them here This should happen before instantiation of the class; incoming paths then should be amended to strip the options in methods. Examples may look like an sftp path "sftp://user@host:/my/path", where the user and host should become kwargs and later get stripped. """ return {}
[docs] def isdir(self, path): """Is this entry directory-like?""" return self.isdir_or_branch(path)
[docs] def isdir_or_branch(self, path): """Is this entry directory-like?""" try: t = self.info(path)["type"] return t == "directory" or t == "branch" except OSError: return False
[docs] def branch_info(self, url): """ Returns information about a branch `user/repo/branch` or `xet://user/repo/branch` """ # try to parse this as a URL # and if not try to parse it as a path if isinstance(url, XetPathInfo): url_path = url else: url_path = parse_url(url, self.domain) if url_path.branch == '': raise ValueError("Incomplete path: Expecting xet://user/repo/branch") parse = urlparse(url_path.remote) path = parse.path components = list(filter(None, path.lstrip('/').rstrip('/').split('/'))) if len(components) < 2: raise ValueError("Incomplete path: Expecting xet://user/repo/branch") prefix = '/'.join(components[:2]) + '/' + url_path.branch attr = _manager.stat(url_path.remote, url_path.branch, "") if attr is None: raise FileNotFoundError( f"Branch or repo not found {url}, remote = {url_path.remote}, branch = {url_path.branch}") return {"name": prefix + '/' + url_path.path, "size": attr.size, "type": attr.ftype}
def branch_exists(self, url): try: self.branch_info(url) return True except Exception as e: return False
[docs] def info(self, url): """ Returns information about a path `user/repo/branch/[path]` or `xet://user/repo/branch/[path]` """ url_path = parse_url(url, self.domain) if url_path.branch == '': raise ValueError("Incomplete path: Expecting xet://user/repo/branch/[path]") parse = urlparse(url_path.remote) path = parse.path components = path.lstrip('/').rstrip('/').split('/') if len(components) < 2: raise ValueError("URL not in recognized format.") prefix = '/'.join(components[:2]) + '/' + url_path.branch attr = _manager.stat(url_path.remote, url_path.branch, url_path.path) if attr is None: raise FileNotFoundError(f"File not found {url}") return {"name": prefix + '/' + url_path.path, "size": attr.size, "type": attr.ftype, "last_modified": None if len(attr.last_modified) == 0 else attr.last_modified}
def make_repo(self, dest_path, private=False, **kwargs): dest = parse_url(dest_path, self.domain) if dest.path != '': raise ValueError("Expecting xet://user/repo for destination") if self.is_repo(dest_path): raise ValueError(f"{dest_path} already exists") domain = self.domain domain_split = domain.split('://') scheme = 'https' if len(domain_split) == 2: scheme = domain_split[0] domain = domain_split[1] # dest_path is of the form xet://user/repo split = dest_path.split('://')[-1].split('/') assert (len(split) == 2) owner, repo = split[0], split[1] query = json.dumps({'name': repo, 'owner': owner, 'private': private}) ret = json.loads(bytes(_manager.api_query(f"{scheme}://{domain}", "", "post", query))) return ret def fork_repo(self, origin_path, dest_path, **kwargs): origin = parse_url(origin_path, self.domain) dest = parse_url(dest_path, self.domain) if origin.path != '': raise ValueError("Expecting xet://user/repo for fork origin") if dest.path != '': raise ValueError("Expecting xet://user/repo for destination") if not self.is_repo(origin_path): raise ValueError(f"{origin_path} is not a repo") if self.is_repo(dest_path): raise ValueError(f"{dest_path} already exists") user_and_repo = list(filter(None, dest_path.split('://')[-1].split('/'))) if len(user_and_repo) != 2: raise ValueError("Expecting xet://user/repo for destination") # validate username user, new_repo = user_and_repo[0], user_and_repo[1] real_username = self.get_username() if user != real_username: raise ValueError(f"Cannot only create repository at xet://{real_username} and not xet://{user}") query = json.dumps({'name': new_repo}) ret = json.loads(bytes(_manager.api_query(origin.remote, "forks", "post", query))) return ret def duplicate_repo(self, origin_path, dest_path, **kwargs): origin = parse_url(origin_path, self.domain) dest = parse_url(dest_path, self.domain) if origin.path != '': raise ValueError("Expecting xet://user/repo for fork origin") if dest.path != '': raise ValueError("Expecting xet://user/repo for destination") if not self.is_repo(origin_path): raise ValueError(f"{origin_path} is not a repo") if self.is_repo(dest_path): raise ValueError(f"{dest_path} already exists") user_and_repo = list(filter(None, dest_path.split('://')[-1].split('/'))) if len(user_and_repo) != 2: raise ValueError("Expecting xet://user/repo for destination") # validate username user, new_repo = user_and_repo[0], user_and_repo[1] real_username = self.get_username() if user != real_username: raise ValueError(f"Cannot only create repository at xet://{real_username} and not xet://{user}") ret = json.loads(bytes(_manager.api_query(origin.remote, "duplicate", "post", ""))) if 'full_name' not in ret: raise RuntimeError("Duplication failed") src_name = 'xet://' + ret['full_name'] if src_name == dest_path: return ret return self.rename_repo(src_name, dest_path) def rename_repo(self, origin_path, dest_path, **kwargs): origin = parse_url(origin_path, self.domain) dest = parse_url(dest_path, self.domain) if origin.path != '': raise ValueError("Expecting xet://user/repo for fork origin") if dest.path != '': raise ValueError("Expecting xet://user/repo for destination") if not self.is_repo(origin_path): raise ValueError(f"{origin_path} is not a repo") if self.is_repo(dest_path): raise ValueError(f"{dest_path} already exists") user_and_repo = list(filter(None, origin_path.split('://')[-1].split('/'))) if len(user_and_repo) != 2: raise ValueError("Expecting xet://user/repo for destination") old_user = user_and_repo[0] user_and_repo = list(filter(None, dest_path.split('://')[-1].split('/'))) if len(user_and_repo) != 2: raise ValueError("Expecting xet://user/repo for destination") new_user, new_repo = user_and_repo[0], user_and_repo[1] if old_user != new_user: raise ValueError("Username must be the same between source and destination") query = json.dumps({'name': new_repo}) ret = json.loads(bytes(_manager.api_query(origin.remote, "", "patch", query))) return ret def set_repo_attr(self, origin_path, attrkey, attrvalue, **kwargs): origin = parse_url(origin_path, self.domain) if origin.path != '': raise ValueError("Expecting xet://user/repo for fork origin") if not self.is_repo(origin_path): raise ValueError(f"{origin_path} is not a repo") query = json.dumps({attrkey: attrvalue}) ret = json.loads(bytes(_manager.api_query(origin.remote, "", "patch", query))) return ret
[docs] def list_repos(self, raw=False, **kwargs): """ Lists the repos available for a path of the form `user` or `xet://user` """ domain = self.domain domain_split = domain.split('://') scheme = 'https' if len(domain_split) == 2: scheme = domain_split[0] domain = domain_split[1] res = json.loads(bytes(_manager.api_query(f"{scheme}://{domain}", "", "get", ""))) if raw: return res else: return [{'name': f['full_name'], 'permissions': f['permissions']} for f in res]
[docs] def list_branches(self, path, raw=False, **kwargs): """ Lists the branches for a path of the form `user/repo` or `xet://user/repo` """ url_path = parse_url(path, self.domain) if url_path.remote == '': raise ValueError("Incomplete path: Expecting xet://user/repo") if url_path.branch != '': raise ValueError('Too many path components for a repo') res = json.loads(bytes(_manager.api_query(url_path.remote, "branches", "get", ""))) if raw: return res else: return [{'name': r['name'], 'type': 'branch'} for r in res]
[docs] def update_size(self, path, bucket_size): """ Calls Xetea to update the size of a synchronized S3 bucket for the repo. """ url_path = parse_url(path, self.domain) if url_path.remote == '': raise ValueError("Incomplete path: Expecting xet://user/repo/branch") if url_path.branch == '': raise ValueError("No branch in path") body = json.dumps({ 'size': bucket_size, 'branch': url_path.branch }) _manager.api_query(url_path.remote, "remote_size", "post", body)
[docs] def ls(self, path, detail=True, **kwargs): """List objects at path. This should include subdirectories and files at that location. The difference between a file and a directory must be clear when details are requested. The specific keys, or perhaps a FileInfo class, or similar, is TBD, but must be consistent across implementations. Must include: - full path to the entry (without protocol) - size of the entry, in bytes. If the value cannot be determined, will be ``None``. - type of entry, "file", "directory" or other Additional information may be present, appropriate to the file-system, e.g., generation, checksum, etc. May use refresh=True|False to allow use of self._ls_from_cache to common where listing may be expensive. Parameters: path: str detail: bool if True, gives a list of dictionaries, where each is the same as the result of ``info(path)``. If False, gives a list of paths (str). kwargs: may have additional backend-specific options, such as version information Returns: List of strings if detail is False, or list of directory information dicts if detail is True. These dicts would have: name (full path in the FS), size (in bytes), type (file, directory, or something else) and other FS-specific keys. """ # list user names if path == '': # if there are no paths. We list all the unique users # list_repos return username/repo so we split the name and # unique the 1st component names = set([f['name'].split('/')[0] for f in self.list_repos()]) return [{'name': n, 'type': 'user'} for n in names] path = path.rstrip('/') if len(path.split('/')) == 1: # if there exactly 1 component in the path it has to be [username] # list_repos return username/repo so we split the name and # and match every repo which username == path names = [f['name'] for f in self.list_repos() if f['name'].split('/')[0] == path] return [{'name': n, 'type': 'repo'} for n in names] url_path = parse_url(path, self.domain) if url_path.branch == '': branches = self.list_branches(path) return [{'name': path + '/' + n['name'], 'type': 'branch'} for n in branches] parse = urlparse(url_path.remote) path = parse.path components = path.lstrip('/').rstrip('/').split('/') if len(components) < 2: raise ValueError('Incomplete path. must be of the form user/repo/[branch]/[path]') prefix = '/'.join(components[:2]) + '/' + url_path.branch files, file_info = _manager.listdir(url_path.remote, url_path.branch, url_path.path) if detail: return [{"name": prefix + '/' + fname, "size": finfo.size, "type": finfo.ftype} for fname, finfo in zip(files, file_info)] else: return files
def _open( self, path, mode="rb", **kwargs, ): """ Return raw bytes-mode file-like from the file-system. Reads can be performed from any where, but writes must be performed within the context of a transaction which must be scoped to within a single repository branch. """ url_path = parse_url(path, self.domain) transaction = getattr(self, "_transaction", None) if transaction is None and not mode.startswith('r'): raise RuntimeError( "Write access to files is only allowed within a commit transaction.") if not mode.startswith('r'): # this is a write if self._transaction is None: # with no transaction raise RuntimeError("Write only allowed in the context of a commit transaction." "Use `with fs.transaction(repo_and_branch, [commit_message]):` to enable write access.") if mode.startswith('r'): repo_handle = _manager.get_repo(url_path.remote) branch = url_path.branch if "flags" in kwargs: handle = repo_handle.open_for_read_with_flags(branch, url_path.path, kwargs["flags"]) else: handle = repo_handle.open_for_read(branch, url_path.path) return XetFile(handle) elif mode.startswith('w'): return self._transaction.open_for_write(url_path) else: raise ValueError("Mode '%s' not supported.", mode)
[docs] def set_commit_message(self, message): """ Sets the commit message on the active transaction """ if self._transaction is None: raise RuntimeError("No active transaction") self._transaction.set_commit_message(message)
[docs] def rm(self, path, *args, **kwargs): """ Delete a file. Deletions must be performed within the context of a transaction which must be scoped to within a single repository branch. """ transaction = self._transaction if transaction is None: raise RuntimeError( "Write access to files is only allowed within a commit transaction.") if len(args) > 0: print(f"rm arguments {args} ignored", file=sys.stderr) if len(kwargs) > 0: print(f"rm arguments {kwargs} ignored", file=sys.stderr) path = parse_url(path, self.domain) if len(path.path) == 0 and len(path.branch) > 0: raise ValueError("Cannot delete branches with 'rm'") if len(path.path) == 0 and len(path.branch) == 0: raise ValueError("Cannot delete repositories with 'rm'") transaction.rm(path)
[docs] def is_repo(self, path): """ Returns true if the path is a repo """ url_path = parse_url(path, self.domain) if len(url_path.branch) != 0: raise ValueError("Too many path components to be a repo") try: self.list_branches(path) return True except Exception as e: return False
[docs] def make_branch(self, repo, src_branch_name, target_branch_name): """ Creates a branch in a repo """ if not self.is_repo(repo): raise ValueError(f"{repo} is not a repository") has_src_branch = self.branch_exists(repo + "/" + src_branch_name) if has_src_branch is False: raise ValueError(f"Cannot copy branch as source branch does not exist: {src_branch_name}") has_dest_branch = self.branch_exists(repo + "/" + target_branch_name) if has_dest_branch: raise ValueError(f"Cannot copy branch as destination branch already exists: {target_branch_name}") query = {"new_branch_name": target_branch_name, "old_branch_name": src_branch_name} query = json.dumps(query) url_path = parse_url(repo, self.domain) _manager.api_query(url_path.remote, "branches", "post", query)
def find_ref(self, repo, ref_name): if not self.is_repo(repo): raise ValueError(f"{repo} is not a repository") url_path = parse_url(repo, self.domain) res = _manager.api_query(url_path.remote, f"git/refs/{ref_name}", "get", "") return json.loads(bytes(res))
[docs] def delete_branch(self, repo, branch_name): """ deletes a branch in a repo """ if not self.is_repo(repo): raise ValueError(f"{repo} is not a repository") has_branch = self.branch_exists(repo + "/" + branch_name) if has_branch is False: raise ValueError(f"Cannot delete branch as branch does not exist: {branch_name}") if branch_name == 'main': raise ValueError("Cannot delete main branch") url_path = parse_url(repo, self.domain) _manager.api_query(url_path.remote, f"branches/{branch_name}", "delete", "")
[docs] def cp_file(self, path1, path2, *args, **kwargs): """ Copies a file or directory from a xet path to another xet path. Copies must be performed within the context of a transaction and are allowed to span branches """ transaction = self._transaction if len(args) > 0: print(f"cp arguments {args} ignored", file=sys.stderr) if len(kwargs) > 0: print(f"cp arguments {kwargs} ignored", file=sys.stderr) if transaction is None: raise RuntimeError( "Write access to files is only allowed within a commit transaction.") parsed_path1 = parse_url(path1, self.domain) parsed_path2 = parse_url(path2, self.domain) if parsed_path1.remote != parsed_path2.remote: raise ValueError("Can only copy between paths in the same repository") if len(parsed_path1.branch) == 0: raise ValueError(f"Branch not specified in copy source {path1}") if len(parsed_path2.branch) == 0: raise ValueError(f"Branch not specified in copy dest {path2}") if len(parsed_path1.path) == 0 and len(parsed_path2.path) == 0: query = {"new_branch_name": parsed_path2.branch, "old_branch_name": parsed_path1.branch} query = json.dumps(query) _manager.api_query(parsed_path1.remote, "branches", "post", query) return transaction.copy(parsed_path1, parsed_path2)
[docs] def mv(self, path1, path2, *args, **kwargs): """ Moves a file or directory from a xet path to another xet path. Moves must be performed within the context of a transaction and must be within the same branch """ transaction = self._transaction if len(args) > 0: print(f"move arguments {args} ignored", file=sys.stderr) if len(kwargs) > 0: print(f"move arguments {kwargs} ignored", file=sys.stderr) if transaction is None: raise RuntimeError( "Write access to files is only allowed within a commit transaction.") parsed_path1 = parse_url(path1, self.domain) parsed_path2 = parse_url(path2, self.domain) if parsed_path1.branch != parsed_path2.branch: raise ValueError("Moves can only happen within a branch") transaction.mv(parsed_path1, parsed_path2)
[docs] def move(self, path1, path2, *args, **kwargs): return self.mv(path1, path2, *args, **kwargs)
[docs] def add_deduplication_hints(self, path_urls): """ Fetches and downloads all of the metadata needed for binary deduplication against all the paths given by `paths`. Once fetched, new data will be deduplicated against any binary content given by `paths`. """ if isinstance(path_urls, str): path_urls = [path_urls] url_paths = [parse_url(url, self.domain, partial_remote=True) for url in path_urls] self._add_deduplication_hints_by_url(url_paths)
def _add_deduplication_hints_by_url(self, url_paths, min_dedup_byte_threshhold=None): if min_dedup_byte_threshhold is None: # TODO: set this to a more resonable value. min_dedup_byte_threshhold = 0 paths_by_remotes = {} for url in url_paths: paths_by_remotes.setdefault(url.remote, []).append(url) for (remote, urls) in paths_by_remotes.items(): repo_handle = _manager.get_repo(remote) repo_handle.fetch_hinted_shards_for_dedup([(url.branch, url.path) for url in urls], min_dedup_byte_threshhold) @property def transaction(self): """ Begin a transaction context for a given repository and branch. The entire transaction is committed atomically at the end of the transaction. All writes must be performed into this branch:: with fs.transaction as tr: tr.set_commit_message("message") file = fs.open('user/repo/main/hello.txt','w') file.write('hello world') file.close() The transaction object is an instance of the :class:`MultiCommitTransaction` """ if self._transaction is None: self._transaction = MultiCommitTransaction(self) return self._transaction def _create_transaction_handler(self, repo_info, commit_message): """ Internal method used by CommitTransaction to get a transaction handler object from the repository handle """ repo_handle = _manager.get_repo(repo_info.remote) return repo_handle.begin_write_transaction(repo_info.branch, commit_message)
[docs] def start_transaction(self, commit_message=None): """ Begin a write transaction for a repository and branch. The entire transaction is committed atomically at the end of the transaction. All writes must be performed into this branch repo_and_branch is of the form `user/repo/branch` or `xet://user/repo/branch`:: fs.start_transaction('my commit message') file = fs.open('user/repo/main/hello.txt','w') file.write('hello world') file.close() fs.end_transaction() The transaction object is an instance of the :class:`MultiCommitTransaction` """ tr = self.transaction tr.start() tr.set_commit_message(commit_message) return tr
[docs] def cancel_transaction(self): """Cancels any active transactions. non-context version""" self._transaction.complete(False)
[docs] def end_transaction(self): """Finish write transaction, non-context version. See :func:`start_transaction`""" self._transaction.complete()
[docs] def mkdir(path, *args, **kwargs): """Noop. Empty directories cannot be created""" pass
[docs] def mkdirs(path, *args, **kwargs): """Noop. Empty directories cannot be created""" pass
[docs] def makedir(path, *args, **kwargs): """Noop. Empty directories cannot be created""" pass
[docs] def makedirs(path, *args, **kwargs): """Noop. Empty directories cannot be created""" pass
fsspec.register_implementation("xet", XetFS, clobber=True)