Source code for appyter.ext.drs
import re
from appyter.ext.urllib import url_filename
from appyter.render.flask_app.constants import get_input_fs
from appyter.ext.fsspec.core import url_to_fs_ex
from appyter.ext.contextlib import ensure_context
from appyter.render.flask_app.upload import organize_file_content
from appyter.context import get_env
[docs]def ensure_drs(url):
''' Ensures a url is accessible via DRS
'''
if url.startswith('storage://input/'):
return url_filename(url)
#
fs, fo = url_to_fs_ex(url)
try:
# Some providers, namely SBFS, provide DRS endpoints already
return fs.get_drs(fo)
except AttributeError:
pass
#
with ensure_context(fs) as fs:
storage_id = organize_file_content(get_input_fs(), fs, fo)
#
env = get_env()
drs_id = re.sub(r"^storage://input/", re.sub(r'^https?://', 'drs://', env['PUBLIC_URL'] + '/'), storage_id)
return drs_id