Module spin_sdk.http.poll_loop

Defines a custom asyncio event loop backed by wasi:io/poll#poll.

This also includes helper classes and functions for working with wasi:http.

As of WASI Preview 2, there is not yet a standard for first-class, composable asynchronous functions and streams. We expect that little or none of this boilerplate will be needed once those features arrive in Preview 3.

Functions

async def register(loop: PollLoop,
pollable: Pollable)
async def send(request: OutgoingRequest) ‑> IncomingResponse

Send the specified request and wait asynchronously for the response.

Classes

class PollLoop

Custom asyncio event loop backed by wasi:io/poll#poll.

Expand source code
class PollLoop(asyncio.AbstractEventLoop):
    """Custom `asyncio` event loop backed by `wasi:io/poll#poll`."""
    
    def __init__(self):
        self.wakers = []
        self.running = False
        self.handles = []
        self.exception = None

    def get_debug(self):
        return False

    def run_until_complete(self, future):
        future = asyncio.ensure_future(future, loop=self)

        self.running = True
        asyncio.events._set_running_loop(self)
        while self.running and not future.done():
            handles = self.handles
            self.handles = []
            for handle in handles:
                if not handle._cancelled:
                    handle._run()
                
            if self.wakers:
                [pollables, wakers] = list(map(list, zip(*self.wakers)))
                
                new_wakers = []
                ready = [False] * len(pollables)
                for index in poll.poll(pollables):
                    ready[index] = True
                
                for (ready, pollable), waker in zip(zip(ready, pollables), wakers):
                    if ready:
                        pollable.__exit__(None, None, None)
                        waker.set_result(None)
                    else:
                        new_wakers.append((pollable, waker))

                self.wakers = new_wakers

            if self.exception is not None:
                raise self.exception
            
        return future.result()

    def is_running(self):
        return self.running

    def is_closed(self):
        return not self.running

    def stop(self):
        self.running = False

    def close(self):
        self.running = False

    def shutdown_asyncgens(self):
        pass

    def call_exception_handler(self, context):
        self.exception = context.get('exception', None)

    def call_soon(self, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        self.handles.append(handle)
        return handle

    def create_task(self, coroutine):
        return asyncio.Task(coroutine, loop=self)

    def create_future(self):
        return asyncio.Future(loop=self)

    # The remaining methods should be irrelevant for our purposes and thus unimplemented

    def run_forever(self):
        raise NotImplementedError

    async def shutdown_default_executor(self):
        raise NotImplementedError

    def _timer_handle_cancelled(self, handle):
        raise NotImplementedError

    def call_later(self, delay, callback, *args, context=None):
        raise NotImplementedError

    def call_at(self, when, callback, *args, context=None):
        raise NotImplementedError

    def time(self):
        raise NotImplementedError

    def call_soon_threadsafe(self, callback, *args, context=None):
        raise NotImplementedError

    def run_in_executor(self, executor, func, *args):
        raise NotImplementedError

    def set_default_executor(self, executor):
        raise NotImplementedError

    async def getaddrinfo(self, host, port, *,
                          family=0, type=0, proto=0, flags=0):
        raise NotImplementedError

    async def getnameinfo(self, sockaddr, flags=0):
        raise NotImplementedError

    async def create_connection(
            self, protocol_factory, host=None, port=None,
            *, ssl=None, family=0, proto=0,
            flags=0, sock=None, local_addr=None,
            server_hostname=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None,
            happy_eyeballs_delay=None, interleave=None):
        raise NotImplementedError

    async def create_server(
            self, protocol_factory, host=None, port=None,
            *, family=socket.AF_UNSPEC,
            flags=socket.AI_PASSIVE, sock=None, backlog=100,
            ssl=None, reuse_address=None, reuse_port=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None,
            start_serving=True):
        raise NotImplementedError

    async def sendfile(self, transport, file, offset=0, count=None,
                       *, fallback=True):
        raise NotImplementedError

    async def start_tls(self, transport, protocol, sslcontext, *,
                        server_side=False,
                        server_hostname=None,
                        ssl_handshake_timeout=None,
                        ssl_shutdown_timeout=None):
        raise NotImplementedError

    async def create_unix_connection(
            self, protocol_factory, path=None, *,
            ssl=None, sock=None,
            server_hostname=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None):
        raise NotImplementedError

    async def create_unix_server(
            self, protocol_factory, path=None, *,
            sock=None, backlog=100, ssl=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None,
            start_serving=True):
        raise NotImplementedError

    async def connect_accepted_socket(
            self, protocol_factory, sock,
            *, ssl=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None):
        raise NotImplementedError

    async def create_datagram_endpoint(self, protocol_factory,
                                       local_addr=None, remote_addr=None, *,
                                       family=0, proto=0, flags=0,
                                       reuse_address=None, reuse_port=None,
                                       allow_broadcast=None, sock=None):
        raise NotImplementedError

    async def connect_read_pipe(self, protocol_factory, pipe):
        raise NotImplementedError

    async def connect_write_pipe(self, protocol_factory, pipe):
        raise NotImplementedError

    async def subprocess_shell(self, protocol_factory, cmd, *,
                               stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               **kwargs):
        raise NotImplementedError

    async def subprocess_exec(self, protocol_factory, *args,
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              **kwargs):
        raise NotImplementedError

    def add_reader(self, fd, callback, *args):
        raise NotImplementedError

    def remove_reader(self, fd):
        raise NotImplementedError

    def add_writer(self, fd, callback, *args):
        raise NotImplementedError

    def remove_writer(self, fd):
        raise NotImplementedError

    async def sock_recv(self, sock, nbytes):
        raise NotImplementedError

    async def sock_recv_into(self, sock, buf):
        raise NotImplementedError

    async def sock_recvfrom(self, sock, bufsize):
        raise NotImplementedError

    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
        raise NotImplementedError

    async def sock_sendall(self, sock, data):
        raise NotImplementedError

    async def sock_sendto(self, sock, data, address):
        raise NotImplementedError

    async def sock_connect(self, sock, address):
        raise NotImplementedError

    async def sock_accept(self, sock):
        raise NotImplementedError

    async def sock_sendfile(self, sock, file, offset=0, count=None,
                            *, fallback=None):
        raise NotImplementedError

    def add_signal_handler(self, sig, callback, *args):
        raise NotImplementedError

    def remove_signal_handler(self, sig):
        raise NotImplementedError

    def set_task_factory(self, factory):
        raise NotImplementedError

    def get_task_factory(self):
        raise NotImplementedError

    def get_exception_handler(self):
        raise NotImplementedError

    def set_exception_handler(self, handler):
        raise NotImplementedError

    def default_exception_handler(self, context):
        raise NotImplementedError

    def set_debug(self, enabled):
        raise NotImplementedError

Ancestors

  • asyncio.events.AbstractEventLoop

Methods

def add_reader(self, fd, callback, *args)
def add_signal_handler(self, sig, callback, *args)
def add_writer(self, fd, callback, *args)
def call_at(self, when, callback, *args, context=None)
def call_exception_handler(self, context)
def call_later(self, delay, callback, *args, context=None)
def call_soon(self, callback, *args, context=None)
def call_soon_threadsafe(self, callback, *args, context=None)
def close(self)

Close the loop.

The loop should not be running.

This is idempotent and irreversible.

No other methods should be called after this one.

async def connect_accepted_socket(self,
protocol_factory,
sock,
*,
ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)

Handle an accepted connection.

This is used by servers that accept connections outside of asyncio, but use asyncio to handle connections.

This method is a coroutine. When completed, the coroutine returns a (transport, protocol) pair.

async def connect_read_pipe(self, protocol_factory, pipe)

Register read pipe in event loop. Set the pipe to non-blocking mode.

protocol_factory should instantiate object with Protocol interface. pipe is a file-like object. Return pair (transport, protocol), where transport supports the ReadTransport interface.

async def connect_write_pipe(self, protocol_factory, pipe)

Register write pipe in event loop.

protocol_factory should instantiate object with BaseProtocol interface. Pipe is file-like object already switched to nonblocking. Return pair (transport, protocol), where transport support WriteTransport interface.

async def create_connection(self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
happy_eyeballs_delay=None,
interleave=None)
async def create_datagram_endpoint(self,
protocol_factory,
local_addr=None,
remote_addr=None,
*,
family=0,
proto=0,
flags=0,
reuse_address=None,
reuse_port=None,
allow_broadcast=None,
sock=None)

A coroutine which creates a datagram endpoint.

This method will try to establish the endpoint in the background. When successful, the coroutine returns a (transport, protocol) pair.

protocol_factory must be a callable returning a protocol instance.

socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on host (or family if specified), socket type SOCK_DGRAM.

reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified it will automatically be set to True on UNIX.

reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows and some UNIX's. If the :py:data:~socket.SO_REUSEPORT constant is not defined then this capability is unsupported.

allow_broadcast tells the kernel to allow this endpoint to send messages to the broadcast address.

sock can optionally be specified in order to use a preexisting socket object.

def create_future(self)
async def create_server(self,
protocol_factory,
host=None,
port=None,
*,
family=0,
flags=1,
sock=None,
backlog=100,
ssl=None,
reuse_address=None,
reuse_port=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True)

A coroutine which creates a TCP server bound to host and port.

The return value is a Server object which can be used to stop the service.

If host is an empty string or None all interfaces are assumed and a list of multiple sockets will be returned (most likely one for IPv4 and another one for IPv6). The host parameter can also be a sequence (e.g. list) of hosts to bind to.

family can be set to either AF_INET or AF_INET6 to force the socket to use IPv4 or IPv6. If not set it will be determined from host (defaults to AF_UNSPEC).

flags is a bitmask for getaddrinfo().

sock can optionally be specified in order to use a preexisting socket object.

backlog is the maximum number of queued connections passed to listen() (defaults to 100).

ssl can be set to an SSLContext to enable SSL over the accepted connections.

reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire. If not specified will automatically be set to True on UNIX.

reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows.

ssl_handshake_timeout is the time in seconds that an SSL server will wait for completion of the SSL handshake before aborting the connection. Default is 60s.

ssl_shutdown_timeout is the time in seconds that an SSL server will wait for completion of the SSL shutdown procedure before aborting the connection. Default is 30s.

start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.

def create_task(self, coroutine)
async def create_unix_connection(self,
protocol_factory,
path=None,
*,
ssl=None,
sock=None,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)
async def create_unix_server(self,
protocol_factory,
path=None,
*,
sock=None,
backlog=100,
ssl=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
start_serving=True)

A coroutine which creates a UNIX Domain Socket server.

The return value is a Server object, which can be used to stop the service.

path is a str, representing a file system path to bind the server socket to.

sock can optionally be specified in order to use a preexisting socket object.

backlog is the maximum number of queued connections passed to listen() (defaults to 100).

ssl can be set to an SSLContext to enable SSL over the accepted connections.

ssl_handshake_timeout is the time in seconds that an SSL server will wait for the SSL handshake to complete (defaults to 60s).

ssl_shutdown_timeout is the time in seconds that an SSL server will wait for the SSL shutdown to finish (defaults to 30s).

start_serving set to True (default) causes the created server to start accepting connections immediately. When set to False, the user should await Server.start_serving() or Server.serve_forever() to make the server to start accepting connections.

def default_exception_handler(self, context)
def get_debug(self)
def get_exception_handler(self)
def get_task_factory(self)
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0)
async def getnameinfo(self, sockaddr, flags=0)
def is_closed(self)

Returns True if the event loop was closed.

def is_running(self)

Return whether the event loop is currently running.

def remove_reader(self, fd)
def remove_signal_handler(self, sig)
def remove_writer(self, fd)
def run_forever(self)

Run the event loop until stop() is called.

def run_in_executor(self, executor, func, *args)
def run_until_complete(self, future)

Run the event loop until a Future is done.

Return the Future's result, or raise its exception.

async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True)

Send a file through a transport.

Return an amount of sent bytes.

def set_debug(self, enabled)
def set_default_executor(self, executor)
def set_exception_handler(self, handler)
def set_task_factory(self, factory)
def shutdown_asyncgens(self)

Shutdown all active asynchronous generators.

async def shutdown_default_executor(self)

Schedule the shutdown of the default executor.

async def sock_accept(self, sock)
async def sock_connect(self, sock, address)
async def sock_recv(self, sock, nbytes)
async def sock_recv_into(self, sock, buf)
async def sock_recvfrom(self, sock, bufsize)
async def sock_recvfrom_into(self, sock, buf, nbytes=0)
async def sock_sendall(self, sock, data)
async def sock_sendfile(self, sock, file, offset=0, count=None, *, fallback=None)
async def sock_sendto(self, sock, data, address)
async def start_tls(self,
transport,
protocol,
sslcontext,
*,
server_side=False,
server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None)

Upgrade a transport to TLS.

Return a new transport that protocol should start using immediately.

def stop(self)

Stop the event loop as soon as reasonable.

Exactly how soon that is may depend on the implementation, but no more I/O callbacks should be scheduled.

async def subprocess_exec(self, protocol_factory, *args, stdin=-1, stdout=-1, stderr=-1, **kwargs)
async def subprocess_shell(self, protocol_factory, cmd, *, stdin=-1, stdout=-1, stderr=-1, **kwargs)
def time(self)
class Sink (body: OutgoingBody)

Writer abstraction over wasi-http/types#outgoing-body.

Expand source code
class Sink:
    """Writer abstraction over `wasi-http/types#outgoing-body`."""
    def __init__(self, body: OutgoingBody):
        self.body = body
        self.stream = body.write()

    async def send(self, chunk: bytes):
        """Write the specified bytes to the sink.

        This may need to yield according to the backpressure requirements of the sink.
        """
        offset = 0
        flushing = False
        while True:
            count = self.stream.check_write()
            if count == 0:
                await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
            elif offset == len(chunk):
                if flushing:
                    return
                else:
                    self.stream.flush()
                    flushing = True
            else:
                count = min(count, len(chunk) - offset)
                self.stream.write(chunk[offset:offset+count])
                offset += count

    def close(self):
        """Close the stream, indicating no further data will be written."""

        self.stream.__exit__(None, None, None)
        self.stream = None
        OutgoingBody.finish(self.body, None)
        self.body = None

Methods

def close(self)

Close the stream, indicating no further data will be written.

async def send(self, chunk: bytes)

Write the specified bytes to the sink.

This may need to yield according to the backpressure requirements of the sink.

class Stream (body: IncomingBody)

Reader abstraction over wasi:http/types#incoming-body.

Expand source code
class Stream:
    """Reader abstraction over `wasi:http/types#incoming-body`."""
    def __init__(self, body: IncomingBody):
        self.body: Optional[IncomingBody] = body
        self.stream: Optional[InputStream] = body.stream()

    async def next(self) -> Optional[bytes]:
        """Wait for the next chunk of data to arrive on the stream.

        This will return `None` when the end of the stream has been reached.
        """
        while True:
            try:
                if self.stream is None:
                    return None
                else:
                    buffer = self.stream.read(READ_SIZE)
                    if len(buffer) == 0:
                        await register(cast(PollLoop, asyncio.get_event_loop()), self.stream.subscribe())
                    else:
                        return buffer
            except Err as e:
                if isinstance(e.value, StreamError_Closed):
                    if self.stream is not None:
                        self.stream.__exit__(None, None, None)
                        self.stream = None
                    if self.body is not None:
                        IncomingBody.finish(self.body)
                        self.body = None
                else:
                    raise e

Methods

async def next(self) ‑> bytes | None

Wait for the next chunk of data to arrive on the stream.

This will return None when the end of the stream has been reached.