Source code for appyter.execspec.implementations.dispatch

from appyter.execspec.spec import AbstractExecutor
from appyter.ext.asyncio.try_n_times import async_try_n_times
from appyter.ext.dict import dict_merge

[docs]class DispatchExecutor(AbstractExecutor): ''' Submit executions to be run by an appyter orchestration dispatcher This executor only supports submit and not wait_for/run as it submits jobs to a queue which manages the execution. usage: dispatch::http://appyter-orchestrator:5000?params.executor=docker::maayanlab/appyter-example:latest ''' protocol = 'dispatch' async def __aenter__(self): import aiohttp self.session = aiohttp.ClientSession( headers=dict( {'Content-Type': 'application/json'}, **self.executor_options.get('headers', {}), ) ) self.client = await self.session.__aenter__() return await super().__aenter__() async def __aexit__(self, type, value, traceback): await self.session.__aexit__(type, value, traceback) await super().__aexit__(type, value, traceback) async def _submit(self, job): async with self.client.post( self.url, json=dict_merge( self.executor_options.get('params',{}), **dict(job, storage=job['storage']), ), ) as resp: queue_size = await resp.json() return queue_size async def _run(self, **job): yield dict(type='status', data=f"Submitting appyter for execution..") queue_size = await async_try_n_times(3, self._submit, job) yield dict(type='status', data=f"Queued successfully, you are at position {queue_size}, your execution will begin when resources are available..") # TODO: room drop hotfix until we have queue tracking import asyncio await asyncio.sleep(60)