Module spin_sdk.http.poll_loop
Defines a custom asyncio
event loop backed by wasi:io/poll#poll
.
This also includes helper classes and functions for working with wasi:http
.
As of WASI Preview 2, there is not yet a standard for first-class, composable asynchronous functions and streams. We expect that little or none of this boilerplate will be needed once those features arrive in Preview 3.
Functions
async def register(loop: PollLoop,
pollable: Pollable)async def send(request: OutgoingRequest) ‑> IncomingResponse
-
Send the specified request and wait asynchronously for the response.
Classes
class PollLoop
-
Custom
asyncio
event loop backed bywasi:io/poll#poll
.Expand source code
class PollLoop(asyncio.AbstractEventLoop): """Custom `asyncio` event loop backed by `wasi:io/poll#poll`.""" def __init__(self): self.wakers = [] self.running = False self.handles = [] self.exception = None def get_debug(self): return False def run_until_complete(self, future): future = asyncio.ensure_future(future, loop=self) self.running = True asyncio.events._set_running_loop(self) while self.running and not future.done(): handles = self.handles self.handles = [] for handle in handles: if not handle._cancelled: handle._run() if self.wakers: [pollables, wakers] = list(map(list, zip(*self.wakers))) new_wakers = [] ready = [False] * len(pollables) for index in poll.poll(pollables): ready[index] = True for (ready, pollable), waker in zip(zip(ready, pollables), wakers): if ready: pollable.__exit__(None, None, None) waker.set_result(None) else: new_wakers.append((pollable, waker)) self.wakers = new_wakers if self.exception is not None: raise self.exception return future.result() def is_running(self): return self.running def is_closed(self): return not self.running def stop(self): self.running = False def close(self): self.running = False def shutdown_asyncgens(self): pass def call_exception_handler(self, context): self.exception = context.get('exception', None) def call_soon(self, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) self.handles.append(handle) return handle def create_task(self, coroutine): return asyncio.Task(coroutine, loop=self) def create_future(self): return asyncio.Future(loop=self) # The remaining methods should be irrelevant for our purposes and thus unimplemented def run_forever(self): raise NotImplementedError async def shutdown_default_executor(self): raise NotImplementedError def _timer_handle_cancelled(self, handle): raise NotImplementedError def call_later(self, delay, callback, *args, context=None): raise NotImplementedError def call_at(self, when, callback, *args, context=None): raise NotImplementedError def time(self): raise NotImplementedError def call_soon_threadsafe(self, callback, *args, context=None): raise NotImplementedError def run_in_executor(self, executor, func, *args): raise NotImplementedError def set_default_executor(self, executor): raise NotImplementedError async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): raise NotImplementedError async def getnameinfo(self, sockaddr, flags=0): raise NotImplementedError async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None): raise NotImplementedError async def create_server( self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True): raise NotImplementedError async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True): raise NotImplementedError async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError async def create_unix_connection( self, protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError async def create_unix_server( self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True): raise NotImplementedError async def connect_accepted_socket( self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None): raise NotImplementedError async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None): raise NotImplementedError async def connect_read_pipe(self, protocol_factory, pipe): raise NotImplementedError async def connect_write_pipe(self, protocol_factory, pipe): raise NotImplementedError async def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs): raise NotImplementedError async def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs): raise NotImplementedError def add_reader(self, fd, callback, *args): raise NotImplementedError def remove_reader(self, fd): raise NotImplementedError def add_writer(self, fd, callback, *args): raise NotImplementedError def remove_writer(self, fd): raise NotImplementedError async def sock_recv(self, sock, nbytes): raise NotImplementedError async def sock_recv_into(self, sock, buf): raise NotImplementedError async def sock_recvfrom(self, sock, bufsize): raise NotImplementedError async def sock_recvfrom_into(self, sock, buf, nbytes=0): raise NotImplementedError async def sock_sendall(self, sock, data): raise NotImplementedError async def sock_sendto(self, sock, data, address): raise NotImplementedError async def sock_connect(self, sock, address): raise NotImplementedError async def sock_accept(self, sock): raise NotImplementedError async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None): raise NotImplementedError def add_signal_handler(self, sig, callback, *args): raise NotImplementedError def remove_signal_handler(self, sig): raise NotImplementedError def set_task_factory(self, factory): raise NotImplementedError def get_task_factory(self): raise NotImplementedError def get_exception_handler(self): raise NotImplementedError def set_exception_handler(self, handler): raise NotImplementedError def default_exception_handler(self, context): raise NotImplementedError def set_debug(self, enabled): raise NotImplementedError
Ancestors
- asyncio.events.AbstractEventLoop
Methods
def add_reader(self, fd, callback, *args)
def add_signal_handler(self, sig, callback, *args)
def add_writer(self, fd, callback, *args)
def call_at(self, when, callback, *args, context=None)
def call_exception_handler(self, context)
def call_later(self, delay, callback, *args, context=None)
def call_soon(self, callback, *args, context=None)
def call_soon_threadsafe(self, callback, *args, context=None)
def close(self)
-
Close the loop.
The loop should not be running.
This is idempotent and irreversible.
No other methods should be called after this one.
async def connect_accepted_socket(self,
protocol_factory,
sock,
*,
ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)-
Handle an accepted connection.
This is used by servers that accept connections outside of asyncio, but use asyncio to handle connections.
This method is a coroutine. When completed, the coroutine returns a (transport, protocol) pair.
async def connect_read_pipe(self, protocol_factory, pipe)
-
Register read pipe in event loop. Set the pipe to non-blocking mode.
protocol_factory should instantiate object with Protocol interface. pipe is a file-like object. Return pair (transport, protocol), where transport supports the ReadTransport interface.
async def connect_write_pipe(self, protocol_factory, pipe)
-
Register write pipe in event loop.
protocol_factory should instantiate object with BaseProtocol interface. Pipe is file-like object already switched to nonblocking. Return pair (transport, protocol), where transport support WriteTransport interface.
async def create_connection(self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
happy_eyeballs_delay=None,
interleave=None)async def create_datagram_endpoint(self,
protocol_factory,
local_addr=None,
remote_addr=None,
*,
family=0,
proto=0,
flags=0,
reuse_address=None,
reuse_port=None,
allow_broadcast=None,
sock=None)-
A coroutine which creates a datagram endpoint.
This method will try to establish the endpoint in the background. When successful, the coroutine returns a (transport, protocol) pair.
protocol_factory must be a callable returning a protocol instance.
socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on host (or family if specified), socket type SOCK_DGRAM.
reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified it will automatically be set to True on UNIX.
reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows and some UNIX's. If the :py:data:
~socket.SO_REUSEPORT
constant is not defined then this capability is unsupported.allow_broadcast tells the kernel to allow this endpoint to send messages to the broadcast address.
sock can optionally be specified in order to use a preexisting socket object.
def create_future(self)
async def create_server(self,
protocol_factory,
host=None,
port=None,
*,
family=0,
flags=1,
sock=None,
backlog=100,
ssl=None,
reuse_address=None,
reuse_port=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True)-
A coroutine which creates a TCP server bound to host and port.
The return value is a Server object which can be used to stop the service.
If host is an empty string or None all interfaces are assumed and a list of multiple sockets will be returned (most likely one for IPv4 and another one for IPv6). The host parameter can also be a sequence (e.g. list) of hosts to bind to.
family can be set to either AF_INET or AF_INET6 to force the socket to use IPv4 or IPv6. If not set it will be determined from host (defaults to AF_UNSPEC).
flags is a bitmask for getaddrinfo().
sock can optionally be specified in order to use a preexisting socket object.
backlog is the maximum number of queued connections passed to listen() (defaults to 100).
ssl can be set to an SSLContext to enable SSL over the accepted connections.
reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified will automatically be set to True on UNIX.
reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows.
ssl_handshake_timeout is the time in seconds that an SSL server will wait for completion of the SSL handshake before aborting the connection. Default is 60s.
ssl_shutdown_timeout is the time in seconds that an SSL server will wait for completion of the SSL shutdown procedure before aborting the connection. Default is 30s.
start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.
def create_task(self, coroutine)
async def create_unix_connection(self,
protocol_factory,
path=None,
*,
ssl=None,
sock=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)async def create_unix_server(self,
protocol_factory,
path=None,
*,
sock=None,
backlog=100,
ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True)-
A coroutine which creates a UNIX Domain Socket server.
The return value is a Server object, which can be used to stop the service.
path is a str, representing a file system path to bind the server socket to.
sock can optionally be specified in order to use a preexisting socket object.
backlog is the maximum number of queued connections passed to listen() (defaults to 100).
ssl can be set to an SSLContext to enable SSL over the accepted connections.
ssl_handshake_timeout is the time in seconds that an SSL server will wait for the SSL handshake to complete (defaults to 60s).
ssl_shutdown_timeout is the time in seconds that an SSL server will wait for the SSL shutdown to finish (defaults to 30s).
start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.
def default_exception_handler(self, context)
def get_debug(self)
def get_exception_handler(self)
def get_task_factory(self)
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0)
async def getnameinfo(self, sockaddr, flags=0)
def is_closed(self)
-
Returns True if the event loop was closed.
def is_running(self)
-
Return whether the event loop is currently running.
def remove_reader(self, fd)
def remove_signal_handler(self, sig)
def remove_writer(self, fd)
def run_forever(self)
-
Run the event loop until stop() is called.
def run_in_executor(self, executor, func, *args)
def run_until_complete(self, future)
-
Run the event loop until a Future is done.
Return the Future's result, or raise its exception.
async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True)
-
Send a file through a transport.
Return an amount of sent bytes.
def set_debug(self, enabled)
def set_default_executor(self, executor)
def set_exception_handler(self, handler)
def set_task_factory(self, factory)
def shutdown_asyncgens(self)
-
Shutdown all active asynchronous generators.
async def shutdown_default_executor(self)
-
Schedule the shutdown of the default executor.
async def sock_accept(self, sock)
async def sock_connect(self, sock, address)
async def sock_recv(self, sock, nbytes)
async def sock_recv_into(self, sock, buf)
async def sock_recvfrom(self, sock, bufsize)
async def sock_recvfrom_into(self, sock, buf, nbytes=0)
async def sock_sendall(self, sock, data)
async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None)
async def sock_sendto(self, sock, data, address)
async def start_tls(self,
transport,
protocol,
sslcontext,
*,
server_side=False,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)-
Upgrade a transport to TLS.
Return a new transport that protocol should start using immediately.
def stop(self)
-
Stop the event loop as soon as reasonable.
Exactly how soon that is may depend on the implementation, but no more I/O callbacks should be scheduled.
async def subprocess_exec(self, protocol_factory, *args, stdin=-1, stdout=-1, stderr=-1, **kwargs)
async def subprocess_shell(self, protocol_factory, cmd, *, stdin=-1, stdout=-1, stderr=-1, **kwargs)
def time(self)
class Sink (body: OutgoingBody)
-
Writer abstraction over
wasi-http/types#outgoing-body
.Expand source code
class Sink: """Writer abstraction over `wasi-http/types#outgoing-body`.""" def __init__(self, body: OutgoingBody): self.body = body self.stream = body.write() async def send(self, chunk: bytes): """Write the specified bytes to the sink. This may need to yield according to the backpressure requirements of the sink. """ offset = 0 flushing = False while True: count = self.stream.check_write() if count == 0: await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe()) elif offset == len(chunk): if flushing: return else: self.stream.flush() flushing = True else: count = min(count, len(chunk) - offset) self.stream.write(chunk[offset:offset+count]) offset += count def close(self): """Close the stream, indicating no further data will be written.""" self.stream.__exit__(None, None, None) self.stream = None OutgoingBody.finish(self.body, None) self.body = None
Methods
def close(self)
-
Close the stream, indicating no further data will be written.
async def send(self, chunk: bytes)
-
Write the specified bytes to the sink.
This may need to yield according to the backpressure requirements of the sink.
class Stream (body: IncomingBody)
-
Reader abstraction over
wasi:http/types#incoming-body
.Expand source code
class Stream: """Reader abstraction over `wasi:http/types#incoming-body`.""" def __init__(self, body: IncomingBody): self.body: Optional[IncomingBody] = body self.stream: Optional[InputStream] = body.stream() async def next(self) -> Optional[bytes]: """Wait for the next chunk of data to arrive on the stream. This will return `None` when the end of the stream has been reached. """ while True: try: if self.stream is None: return None else: buffer = self.stream.read(READ_SIZE) if len(buffer) == 0: await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe()) else: return buffer except Err as e: if isinstance(e.value, StreamError_Closed): if self.stream is not None: self.stream.__exit__(None, None, None) self.stream = None if self.body is not None: IncomingBody.finish(self.body) self.body = None else: raise e
Methods
async def next(self) ‑> bytes | None
-
Wait for the next chunk of data to arrive on the stream.
This will return
None
when the end of the stream has been reached.