import json
import sys
from urllib.parse import urlparse
from enum import IntEnum
import fsspec
import os
from .commit_transaction import MultiCommitTransaction
from .file_interface import XetFile
from .url_parsing import parse_url, XetPathInfo, normalize_endpoint, set_default_endpoint, get_default_endpoint
if 'SPHINX_BUILD' not in os.environ:
from .rpyxet import rpyxet
__repo_managers = {}
__login_credentials = {}
def _repo_manager(endpoint):
global __repo_managers
global __login_credentials
endpoint = normalize_endpoint(endpoint)
try:
return __repo_managers[endpoint]
except KeyError:
pass
repo = rpyxet.PyRepoManager(endpoint)
__login_credentials
if endpoint in __login_credentials:
repo.override_login_config(*__login_credentials[endpoint])
elif None in __login_credentials:
repo.override_login_config(*__login_credentials[None])
__repo_managers[endpoint] = repo
return repo
[docs]
def login(user, token, email=None, host=None):
"""
Sets the active login credentials used to authenticate against Xethub.
"""
global __login_credentials
if host is not None:
host = normalize_endpoint(host)
set_default_endpoint(host)
__login_credentials[host] = (user, token, email, host)
if host is None:
for repo in __repo_managers.values():
repo.override_login_config(user, token, email)
else:
if host in __repo_managers:
__repo_managers[host].override_login_config(user, token, email, host)
[docs]
def open(file_url, mode="rb", **kwargs):
"""
Open the file at the specific Xet file URL
of the form `xet://<endpoint>:<user>/<repo>/<branch>/<path-to-file>`.
For example::
f = pyxet.open('xet://xethub.com:XetHub/Flickr30k/main/results.csv')
"""
url_info = parse_url(file_url, expect_branch=True)
fs = XetFS(endpoint = url_info.endpoint)
return fs._open(url_info.name(), mode=mode, **kwargs)
class XetFSOpenFlags(IntEnum):
FILE_FLAG_NO_BUFFERING = 0x20000000
[docs]
class XetFS(fsspec.spec.AbstractFileSystem):
protocol = "xet" # This allows pandas, etc. to implement "xet://"
cachable = True
sep = "/"
async_impl = False
root_marker = "/"
[docs]
def from_url(url):
"""
Initializes the proper information from a URL.
"""
url_info = parse_url(url, expect_repo=None)
return XetFS(endpoint = url_info.endpoint)
[docs]
def __init__(self, endpoint=None, **storage_options):
"""
Opens the repository at `repo_url` as an fsspec file system handle,
providing read-only operations such as ls, glob, and open.
User and token are needed for private repositories and they
can be set with `pyxet.login`.
Examples::
import pyxet
fs = pyxet.XetFS('xethub.com')
# List files.
fs.ls('XetHub/Flickr30k/main')
# Read the first 5 lines of a file
b = fs.open('XetHub/Flickr30k/main/results.csv').read()
the Xet repository endpoint can be set with the 'endpoint' argument
or the XET_ENDPOINT environment variable. The default endpoint is
xethub.com if unspecified
"""
# If the endpoint is None, then it goes to the default xethub.com with a warning
# later on.
if endpoint is None:
self.endpoint = get_default_endpoint()
else:
self.endpoint = endpoint
self.intrans = False
self._transaction = None
# Init the base class.
super().__init__()
@classmethod
def _strip_protocol(cls, path):
"""Turn path from fully-qualified to file-system-specific
May require FS-specific handling, e.g., for relative paths or links.
"""
if isinstance(path, list):
return [cls._strip_protocol(p) for p in path]
if path.startswith('xet://'):
protostripped = path[5:]
elif path.startswith('https://'):
protostripped = path[8:]
else:
protostripped = path
return protostripped.lstrip('/')
[docs]
def get_username(self):
"""
Returns the inferred username for the endpoint
"""
return _repo_manager(self.endpoint).get_inferred_username(self.endpoint)
def unstrip_protocol(self, name):
"""Format FS-specific path to generic, including protocol"""
return 'xet://' + name.lstrip('/')
[docs]
def __repr__(self):
return f"XetFS(endpoint = {self.endpoint})"
@staticmethod
def _get_kwargs_from_urls(path):
"""If kwargs can be encoded in the paths, extract them here
This should happen before instantiation of the class; incoming paths
then should be amended to strip the options in methods.
Examples may look like an sftp path "sftp://user@host:/my/path", where
the user and host should become kwargs and later get stripped.
"""
url_path = parse_url(path)
return {"endpoint" : url_path.endpoint}
[docs]
def isdir(self, path):
"""Is this entry directory-like?"""
return self.isdir_or_branch(path)
[docs]
def isdir_or_branch(self, path):
"""Is this entry directory-like?"""
try:
t = self.info(path)["type"]
return t == "directory" or t == "branch"
except OSError:
return False
[docs]
def branch_info(self, url):
"""
Returns information about a branch `user/repo/branch`
or `xet://[endpoint:]<user>/<repo>/<branch>`
"""
# try to parse this as a URL
# and if not try to parse it as a path
if isinstance(url, XetPathInfo):
url_path = url
else:
url_path = parse_url(url, self.endpoint, expect_branch = True)
try:
attr = self._manager.stat(url_path.remote(), url_path.branch, "")
except Exception as e:
print(f"Error accessing repo {url}: {e}")
raise
if attr is None:
raise FileNotFoundError(
f"Branch or repo not found, remote = {url_path.remote()}, branch = {url_path.branch}")
return {"name": url_path.name(),
"size": attr.size,
"type": attr.ftype}
def branch_exists(self, url):
try:
self.branch_info(url)
return True
except Exception as e:
return False
[docs]
def info(self, url):
"""
Returns information about a path `user/repo/branch/[path]`
or `xet://[endpoint:]<user>/<repo>/<branch>/[path]`
"""
url_path = parse_url(url, self.endpoint, expect_branch = True)
try:
attr = self._manager.stat(url_path.remote(), url_path.branch, url_path.path)
except Exception as e:
print(f"{e}")
sys.exit(1)
if attr is None:
raise FileNotFoundError(f"File not found {url}")
return {"name": url_path.name(),
"size": attr.size,
"type": attr.ftype,
"last_modified": None if len(attr.last_modified) == 0 else attr.last_modified}
def make_repo(self, dest_path, private=False, **kwargs):
dest = parse_url(dest_path, self.endpoint, expect_branch = False, expect_repo=True)
if self.is_repo(dest_path):
raise ValueError(f"{dest_path} already exists")
query = json.dumps({'name': dest.repo, 'owner': dest.user, 'private': private})
ret = json.loads(bytes(self._manager.api_query(dest.endpoint_url(), "", "post", query)))
return ret
def fork_repo(self, origin_path, dest_path, **kwargs):
origin = parse_url(origin_path, self.endpoint, expect_branch = False)
dest = parse_url(dest_path, self.endpoint, expect_branch = False)
if not self.is_repo(origin_path):
raise ValueError(f"{origin_path} is not a repo")
if self.is_repo(dest_path):
raise ValueError(f"{dest_path} already exists")
if origin.endpoint != dest.endpoint:
raise ValueError("Cannot fork repos between endpoints.")
auth_user = self.get_username()
if dest.user != auth_user:
raise ValueError(f"Can only fork a repo into your account ({dest.user} != {auth_user})")
query = json.dumps({'name': dest.repo})
ret = json.loads(bytes(self._manager.api_query(origin.remote(), "forks", "post", query)))
return ret
def duplicate_repo(self, origin_path, dest_path, **kwargs):
origin = parse_url(origin_path, self.endpoint, expect_branch = False)
dest = parse_url(dest_path, self.endpoint, expect_branch = False)
if not self.is_repo(origin.remote()):
raise ValueError(f"{origin_path} is not a repo")
if self.is_repo(dest.remote()):
raise ValueError(f"{dest_path} already exists")
if origin.endpoint != dest.endpoint:
raise ValueError("Cannot fork repos between different endpoints.")
auth_user = self.get_username()
if dest.user != auth_user:
raise ValueError(f"Can only duplicate a repo into your account ({dest.user} != {auth_user})")
ret = json.loads(bytes(self._manager.api_query(origin.remote(), "duplicate", "post", "")))
if 'full_name' not in ret:
raise RuntimeError("Duplication failed")
ret_name = 'xet://' + ret['full_name']
ret_info = parse_url(ret_name, self.endpoint, expect_branch=False)
if ret_info == dest:
return ret
return self.rename_repo(ret_name, dest_path)
def rename_repo(self, origin_path, dest_path, **kwargs):
origin = parse_url(origin_path, self.endpoint, expect_branch = False)
dest = parse_url(dest_path, self.endpoint, expect_branch = False)
if not self.is_repo(origin.remote()):
raise ValueError(f"{origin_path} is not a repo")
if self.is_repo(dest.remote()):
raise ValueError(f"{dest_path} already exists")
if origin.user != dest.user:
raise ValueError("Username must be the same between source and destination")
query = json.dumps({'name': dest.repo})
ret = json.loads(bytes(self._manager.api_query(origin.remote(), "", "patch", query)))
return ret
def set_repo_attr(self, origin_path, attrkey, attrvalue, **kwargs):
origin = parse_url(origin_path, self.endpoint, expect_branch=False)
if not self.is_repo(origin_path):
raise ValueError(f"{origin_path} is not a repo")
query = json.dumps({attrkey: attrvalue})
ret = json.loads(bytes(self._manager.api_query(origin.remote(), "", "patch", query)))
return ret
[docs]
def list_repos(self, url, raw=False, **kwargs):
"""
Lists the repos available for a path of the form `user` or `xet://[endpoint:]<user>`
"""
remote = parse_url(url, self.endpoint, expect_branch=False, expect_repo=False)
res = json.loads(bytes(self._manager.api_query(remote.remote(endpoint_only=True), "", "get", "")))
if raw:
return res
else:
return [{'name': f['full_name'],
'permissions': f['permissions']} for f in res]
[docs]
def list_branches(self, path, raw=False, **kwargs):
"""
Lists the branches for a path of the form `user/repo` or `xet://[endpoint:]<user>/<repo>`
"""
url_path = parse_url(path, self.endpoint, expect_branch=False)
res = json.loads(bytes(self._manager.api_query(url_path.remote(), "branches", "get", "")))
if raw:
return res
else:
return [{'name': r['name'], 'type': 'branch'} for r in res]
[docs]
def update_size(self, path, bucket_size):
"""
Calls Xetea to update the size of a synchronized S3 bucket for the repo.
"""
url_path = parse_url(path, self.endpoint, expect_branch=True)
body = json.dumps({
'size': bucket_size,
'branch': url_path.branch
})
self._manager.api_query(url_path.remote(), "remote_size", "post", body)
[docs]
def ls(self, path : str, detail=True, **kwargs):
"""List objects at path.
This should include subdirectories and files at that location. The
difference between a file and a directory must be clear when details
are requested.
The specific keys, or perhaps a FileInfo class, or similar, is TBD,
but must be consistent across implementations.
Must include:
- full path to the entry (without protocol)
- size of the entry, in bytes. If the value cannot be determined, will
be ``None``.
- type of entry, "file", "directory" or other
Additional information
may be present, appropriate to the file-system, e.g., generation,
checksum, etc.
May use refresh=True|False to allow use of self._ls_from_cache to
common where listing may be expensive.
Parameters:
path: str
detail: bool
if True, gives a list of dictionaries, where each is the same as
the result of ``info(path)``. If False, gives a list of paths
(str).
kwargs: may have additional backend-specific options, such as version
information
Returns:
List of strings if detail is False, or list of directory information
dicts if detail is True. These dicts would have: name (full path in the FS),
size (in bytes), type (file, directory, or something else) and other FS-specific keys.
"""
url_path = parse_url(path, self.endpoint, expect_branch=None, expect_repo=None)
if url_path.repo == "":
names = [f['name'] for f in self.list_repos(path)]
# To list all repos accessible by the current authenticated user, use
# xet://endpoint:/
if url_path.user == "":
return [{'name': url_path.endpoint + ":" + n, 'type': 'repo'} for n in names]
else:
return [{'name': url_path.endpoint + ":" + n, 'type': 'repo'} for n in names if n.startswith(url_path.user)]
elif url_path.branch == "":
branches = self.list_branches(url_path.remote())
return [{'name': url_path.base_path() + n['name'], 'type': 'branch'} for n in branches]
else:
files, file_info = self._manager.listdir(url_path.remote(),
url_path.branch,
url_path.path)
# Note that we cannot actually standardize the paths in the listed files.
# If we do, glob will not work as it calls this and matches names against the query.
if detail:
ret = [{"name": url_path.base_path() + "/" + fname,
"size": finfo.size,
"type": finfo.ftype}
for fname, finfo in zip(files, file_info)]
else:
ret = [url_path.base_path() + "/" + fname for fname in files]
return ret
def _open(
self,
path,
mode="rb",
**kwargs,
):
"""
Return raw bytes-mode file-like from the file-system.
Reads can be performed from any where, but writes must be performed
within the context of a transaction which must be scoped to within a
single repository branch.
"""
url_path = parse_url(path, self.endpoint)
transaction = getattr(self, "_transaction", None)
if transaction is None and not mode.startswith('r'):
raise RuntimeError(
"Write access to files is only allowed within a commit transaction.")
if not mode.startswith('r'):
# this is a write
if self._transaction is None:
# with no transaction
raise RuntimeError("Write only allowed in the context of a commit transaction."
"Use `with fs.transaction(repo_and_branch, [commit_message]):` to enable write access.")
if mode.startswith('r'):
repo_handle = self._manager.get_repo(url_path.remote())
branch = url_path.branch
if "flags" in kwargs:
handle = repo_handle.open_for_read_with_flags(branch, url_path.path, kwargs["flags"])
else:
handle = repo_handle.open_for_read(branch, url_path.path)
return XetFile(handle)
elif mode.startswith('w'):
return self._transaction.open_for_write(url_path)
else:
raise ValueError("Mode '%s' not supported.", mode)
[docs]
def set_commit_message(self, message):
"""
Sets the commit message on the active transaction
"""
if self._transaction is None:
raise RuntimeError("No active transaction")
self._transaction.set_commit_message(message)
[docs]
def rm(self, path, *args, **kwargs):
"""
Delete a file.
Deletions must be performed within the context of a transaction
which must be scoped to within a single repository branch.
"""
transaction = self._transaction
if transaction is None:
raise RuntimeError(
"Write access to files is only allowed within a commit transaction.")
if len(args) > 0:
print(f"rm arguments {args} ignored", file=sys.stderr)
if len(kwargs) > 0:
print(f"rm arguments {kwargs} ignored", file=sys.stderr)
path = parse_url(path, self.endpoint, expect_repo = None)
if len(path.path) == 0 and len(path.branch) > 0:
raise ValueError("Cannot delete branches with 'rm'")
if len(path.path) == 0 and len(path.branch) == 0:
raise ValueError("Cannot delete repositories with 'rm'")
transaction.rm(path)
[docs]
def is_repo(self, path):
"""
Returns true if the path is a repo
"""
url_path = parse_url(path, self.endpoint, expect_branch=False)
try:
self.list_branches(url_path.remote())
return True
except Exception as e:
return False
[docs]
def make_branch(self, repo, src_branch_name, target_branch_name):
"""
Creates a branch in a repo
"""
if not self.is_repo(repo):
raise ValueError(f"{repo} is not a repository")
has_src_branch = self.branch_exists(repo + "/" + src_branch_name)
if has_src_branch is False:
raise ValueError(f"Cannot copy branch as source branch does not exist: {src_branch_name}")
has_dest_branch = self.branch_exists(repo + "/" + target_branch_name)
if has_dest_branch:
raise ValueError(f"Cannot copy branch as destination branch already exists: {target_branch_name}")
query = {"new_branch_name": target_branch_name,
"old_branch_name": src_branch_name}
query = json.dumps(query)
url_path = parse_url(repo, self.endpoint)
self._manager.api_query(url_path.remote(), "branches", "post", query)
def find_ref(self, repo, ref_name):
if not self.is_repo(repo):
raise ValueError(f"{repo} is not a repository")
url_path = parse_url(repo, self.endpoint)
res = self._manager.api_query(url_path.remote(), f"git/refs/{ref_name}", "get", "")
return json.loads(bytes(res))
[docs]
def delete_branch(self, repo, branch_name):
"""
deletes a branch in a repo
"""
if not self.is_repo(repo):
raise ValueError(f"{repo} is not a repository")
has_branch = self.branch_exists(repo + "/" + branch_name)
if has_branch is False:
raise ValueError(f"Cannot delete branch as branch does not exist: {branch_name}")
if branch_name == 'main':
raise ValueError("Cannot delete main branch")
url_path = parse_url(repo, self.endpoint)
self._manager.api_query(url_path.remote(), f"branches/{branch_name}", "delete", "")
[docs]
def cp_file(self, path1, path2, *args, **kwargs):
"""
Copies a file or directory from a xet path to another xet path.
Copies must be performed within the context of a transaction
and are allowed to span branches
"""
transaction = self._transaction
if len(args) > 0:
print(f"cp arguments {args} ignored", file=sys.stderr)
if len(kwargs) > 0:
print(f"cp arguments {kwargs} ignored", file=sys.stderr)
if transaction is None:
raise RuntimeError(
"Write access to files is only allowed within a commit transaction.")
parsed_path1 = parse_url(path1, self.endpoint)
parsed_path2 = parse_url(path2, self.endpoint)
if parsed_path1.remote() != parsed_path2.remote():
raise ValueError("Can only copy between paths in the same repository")
if len(parsed_path1.branch) == 0:
raise ValueError(f"Branch not specified in copy source {path1}")
if len(parsed_path2.branch) == 0:
raise ValueError(f"Branch not specified in copy dest {path2}")
if len(parsed_path1.path) == 0 and len(parsed_path2.path) == 0:
query = {"new_branch_name": parsed_path2.branch,
"old_branch_name": parsed_path1.branch}
query = json.dumps(query)
self._manager.api_query(parsed_path1.remote(), "branches", "post", query)
return
transaction.copy(parsed_path1, parsed_path2)
[docs]
def mv(self, path1, path2, *args, **kwargs):
"""
Moves a file or directory from a xet path to another xet path.
Moves must be performed within the context of a transaction
and must be within the same branch
"""
transaction = self._transaction
if len(args) > 0:
print(f"move arguments {args} ignored", file=sys.stderr)
if len(kwargs) > 0:
print(f"move arguments {kwargs} ignored", file=sys.stderr)
if transaction is None:
raise RuntimeError(
"Write access to files is only allowed within a commit transaction.")
parsed_path1 = parse_url(path1, self.endpoint, expect_branch=True)
parsed_path2 = parse_url(path2, self.endpoint, expect_branch=True)
if parsed_path1.remote() != parsed_path2.remote():
raise ValueError("Can only copy between paths in the same repository")
if parsed_path1.branch != parsed_path2.branch:
raise ValueError("Moves can only happen within a branch")
transaction.mv(parsed_path1, parsed_path2)
[docs]
def move(self, path1, path2, *args, **kwargs):
return self.mv(path1, path2, *args, **kwargs)
[docs]
def add_deduplication_hints(self, path_urls):
"""
Fetches and downloads all of the metadata needed for binary deduplication against
all the paths given by `paths`. Once fetched, new data will be deduplicated against
any binary content given by `paths`.
"""
if isinstance(path_urls, str):
path_urls = [path_urls]
url_paths = [parse_url(url, self.endpoint) for url in path_urls]
self._add_deduplication_hints_by_url(url_paths)
def _add_deduplication_hints_by_url(self, url_paths, min_dedup_byte_threshhold=None):
if min_dedup_byte_threshhold is None:
# TODO: set this to a more resonable value.
min_dedup_byte_threshhold = 0
paths_by_remotes = {}
for url in url_paths:
paths_by_remotes.setdefault(url.remote(), []).append(url)
for (remote, urls) in paths_by_remotes.items():
repo_handle = self._manager.get_repo(remote)
repo_handle.fetch_hinted_shards_for_dedup([(url.branch, url.path) for url in urls],
min_dedup_byte_threshhold)
@property
def _manager(self):
"""
The repo manager associated with this repo.
"""
return _repo_manager(self.endpoint)
@property
def transaction(self):
"""
Begin a transaction context for a given repository and branch.
The entire transaction is committed atomically at the end of the
transaction. All writes must be performed into this branch::
with fs.transaction as tr:
tr.set_commit_message("message")
file = fs.open('<user>/<repo>/main/hello.txt','w')
file.write('hello world')
file.close()
The transaction object is an instance of the :class:`MultiCommitTransaction`
"""
if self._transaction is None:
self._transaction = MultiCommitTransaction(self)
return self._transaction
def _create_transaction_handler(self, repo_info, commit_message):
"""
Internal method used by CommitTransaction to get
a transaction handler object from the repository handle
"""
repo_handle = self._manager.get_repo(repo_info.remote())
return repo_handle.begin_write_transaction(repo_info.branch, commit_message)
[docs]
def start_transaction(self, commit_message=None):
"""
Begin a write transaction for a repository and branch.
The entire transaction is committed atomically at the end of the
transaction. All writes must be performed into this branch
repo_and_branch is of the form
`<user>/<repo>/<branch>` or `xet://[endpoint:]<user>/<repo>/<branch>`::
fs.start_transaction('my commit message')
file = fs.open('user/repo/main/hello.txt','w')
file.write('hello world')
file.close()
fs.end_transaction()
The transaction object is an instance of the :class:`MultiCommitTransaction`
"""
tr = self.transaction
tr.start()
tr.set_commit_message(commit_message)
return tr
[docs]
def cancel_transaction(self):
"""Cancels any active transactions. non-context version"""
self._transaction.complete(False)
[docs]
def end_transaction(self):
"""Finish write transaction, non-context version. See :func:`start_transaction`"""
self._transaction.complete()
[docs]
def mkdir(path, *args, **kwargs):
"""Noop. Empty directories cannot be created"""
pass
[docs]
def mkdirs(path, *args, **kwargs):
"""Noop. Empty directories cannot be created"""
pass
[docs]
def makedir(path, *args, **kwargs):
"""Noop. Empty directories cannot be created"""
pass
[docs]
def makedirs(path, *args, **kwargs):
"""Noop. Empty directories cannot be created"""
pass
fsspec.register_implementation("xet", XetFS, clobber=True)