Source code for appyter.extras.catalog_integration.uploads
from dataclasses import dataclass, asdict
from appyter.ext.urllib import join_url, parent_url
[docs]@dataclass(frozen=True,)
class FileInfo:
file: str
filename: str
metadata: dict = None
[docs]async def add_file(data: FileInfo, auth=None, config=None):
import aiohttp
async with aiohttp.ClientSession(
headers={
'Authorization': f"Bearer {auth}",
} if auth else {},
raise_for_status=True,
) as session:
async with session.post(
join_url(
parent_url(config['PUBLIC_URL']),
'postgrest/rpc/add_file',
),
json=asdict(data),
) as res:
return await res.json()
[docs]async def list_files(auth=None, config=None):
if not auth: raise PermissionError
import aiohttp
async with aiohttp.ClientSession(
headers={
'Authorization': f"Bearer {auth}",
},
raise_for_status=True,
) as session:
async with session.get(
join_url(
parent_url(config['PUBLIC_URL']),
'postgrest/user_file',
),
) as res:
return await res.json()