ASGI from scratch – Let's build an ASGI web framework

栏目: IT技术 · 发布时间: 4年前

内容简介:The first time I usedASGI specification is now at version 3 at the time of writing and both ASGI and Channels became part of Django Software Foundation. Compared to the draft version, it has matured a lot with added lifecycle calls and better application f

Intro

The first time I used ASGI ( Asynchronous Server Gateway Interface ) was through Channels 1.0 when ASGI spec was still a draft. It was my first interview project which helped me get my current job. It felt magical at that time how easy it is to add WebSocket functionality to my Django app and handles authentication and other Django related things for me seamlessly.

ASGI specification is now at version 3 at the time of writing and both ASGI and Channels became part of Django Software Foundation. Compared to the draft version, it has matured a lot with added lifecycle calls and better application format, etc. Most excitingly, a healthy and fast-growing community is forming and we are seeing more and more ASGI servers running in production environments. At my company, we are serving a few million requests per day through ASGI running on Daphne , Netflix’s Dispatch is based on FastAPI , a popular ASGI web application framework, and apparently, Microsoft is using it too.

I would humbly advise anyone building web services in Python to learn about ASGI. And the best way to learn something is to built things with it, so in this blog post, I’ll walk through the steps to build a micro web application framework that speaks ASGI. I hope it can help explain how ASGI works.

Settings the goal

Before writing the first line of code, we need to have a basic understanding of what ASGI is and what we are building towards.

How ASGI works

Here’s a simple diagram showing how ASGI works at a high level.

graph TD
	A[Client] -->|HTTP, WebSocket, ...| B(ASGI Server)
	B --> |scope, send, receive| C(ASGI application)

To put it in simple words, A browser(client), establishes a connection to ASGI server with a certain type of request (HTTP or WebSocket), the ASGI server then calls ASGI application with information about the connection, encapsulated in a python dictionary called scope , and two callbacks, named send and receive , that the application can use to send and receive messages between server and client.

Here’s an example HTTP request scope

{
    "type": "http",
    "http_version": "1.1",
    "server": ("127.0.0.1", 8000),
    "client": ("127.0.0.1", 60457),
    "scheme": "http",
    "method": "GET",
    "root_path": "",
    "path": "/hello/a",
    "raw_path": b"/hello/a",
    "query_string": b"",
    "headers": [
        (b"host", b"localhost:8000"),
        (b"connection", b"keep-alive"),
        (
            b"user-agent",
            b"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36",
        ),
        (
            b"accept",
            b"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        ),
        (b"accept-encoding", b"gzip, deflate, br"),
        (b"accept-language", b"en-US,en;q=0.9"),
        (
            b"cookie",
            b'csrftoken=dDA2IAPrvgPc7hkyBSyctxDk78KmhHAzUqR0LUpjXI3Xgki0QrGEWazE3RGZuLGl',
        ),
    ],
}

You might notice that scope is not too different from a WSGI environ . In fact, ASGI interface is very similar to WSGI interface, but instead of getting a environ and start_response to send headers and using the return value of WSGI application as the response body, ASGI interfaces with the connection and allows us to receive and send messages multiple times during the lifecycle of the connection asynchronously until the connection is closed. This allows a nice interface for both WebSocket and HTTP.

It’s also totally possible to wrap a WSGI application inside an ASGI application, just prepare a WSGI environ and start_response based on scope , receive , and send then call the WSGI application and it would work. If you delegate that call into a thread pool or something similar, you just made your WSGI application asynchronous. This is roughly how Channels wraps around Django.

Define ASGI framework

When I say ASGI framework I refer it as a framework that makes building ASGI application easier and this does not include the ASGI server part. I’m mentioning this because some of the earlier Python asynchronous web frameworks have their own server implementation that also takes over tasks such as parsing HTTP requests, handles network connections, etc. We are not doing those in ASGI web framework. As a spiritual successor to WSGI, where web servers, such as Gunicorn and uwsgi, and web frameworks, such as Flask and Django, are separated, ASGI has this separation too.

So, what does an ASGI application look like?

ASGI Hello World

A simple ASGI hello world application can be written as:

async def application(scope, receive, send):
    name = scope["path"].split("/", 1)[-1] or "world"
    await send(
        {
            "type": "http.response.start",
            "status": 200,
            "headers": [[b"content-type", b"text/plain"],],
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": f"Hello, {name}!".encode(),
            "more_body": False,
        }
    )

http.response.start starts an HTTP response sending status code and response headers. In this example, it responds with the 200 OK status code and has content-type set to text/plain in the headers. http.response.body sends the response body, the more_body key tells the server if the response is finished. ASGI server might use this to know if a connection should be closed or automatically decide between a content-length header or a chunked encoding.

We can run the application with uvicorn :

uvicorn asgi-hello:application

And you should be able to visit http://localhost:8000/ and get Hello, world .Visiting http://localhost:8000/tom would get you Hello, tom .

By the way, uvicorn is pretty fast, a simple benchmark with wrk -d10s http://localhost:8000/hi on a 2018 lowest spec MacBook Air yields Requests/sec: 27857.87 .

Although this approach works with a simple hello world example, it’s not exactly convenient to write a more complex application this way. For one, it doesn’t do routing, if you want to respond differently for different paths, you’ll probably end up with a huge if ... else if ... else clause. Secondly, having to write the ASGI message every time in the form of a python dict is quite arduous. Third, in a complex application, it gets harder to track the status of the connection, such as is the response started, is the response ended, should I start the response here, etc.

Goal

With the new framework, I hope to be able to write an ASGI application like this:

import asyncio
from aaf import aaf # Another ASGI framework
from aaf.routing import Router
from aaf.response import HttpResponse

router = Router()

@router.route('/')
@router.route('/<name>')
async def hello(connection, name='world'):
	return HttpResponse(f"Hello, {name}")


@router.route('/count')
@router.route('/count/<int:number>')
async def count(connection, number=10):
	for i in range(number):
		await connection.send(f'count {i}\n', finish=False)
		await asyncio.sleep(1)
	await connection.send('', finish=True)


@router.route('/echo')
async def echo(connection):
	body = await connection.body()
	await connection.send(body, finish=True)


app = aaf([router])

I hope this snippet of how I want the framework to look like is self-explanatory. But here are some of the key things I want to achieve:

  1. It should be able to handle HTTP response declaratively and imperatively.
  2. It should support Flask style routing with parameter parsing.

Building the framework

Connection class

The Connection class will represent an ASGI HTTP or WebSocket connection. It’s a class that encapsulates the three basic elements in ASGI, namely scope , send and receive , and expose some convenient methods and properties so that users don’t need to verbosely write out all the ASGI messages and parse everything, such as cookies and headers, from scope . But it should allow users to access the original scope , send and receive when they want to, so that the composability of ASGI applications is maintained. For example, it should allow user to delegate certain connection s to another ASGI application by calling another_asgi_app(connection.scope, connectionn.asgi_send, connection.asgi_receive) .

Here’s a simple implementation of the Connection class.

from enum import Enum
from functools import cached_property
from http.cookies import SimpleCookie
from typing import Any, Awaitable, Callable, Optional, Union
from urllib.parse import parse_qsl, unquote_plus

from werkzeug.datastructures import Headers, MultiDict

CoroutineFunction = Callable[[Any], Awaitable]


class ConnectionType(Enum):
    HTTP = "HTTP"
    WebSocket = "WebSocket"


class Connection:
    def __init__(
        self, scope: dict, *, send: CoroutineFunction, receive: CoroutineFunction
    ):
        self.scope = scope
        self.asgi_send = send
        self.asgi_receive = receive

        self.started = False
        self.finished = False
        self.resp_headers = Headers()
        self.resp_cookies: SimpleCookie = SimpleCookie()
        self.resp_status_code: Optional[int] = None

        self.http_body = b""
        self.http_has_more_body = True
        self.http_received_body_length = 0

    @cached_property
    def req_headers(self) -> Headers:
        headers = Headers()
        for (k, v) in self.scope["headers"]:
            headers.add(k.decode("ascii"), v.decode("ascii"))
        return headers

    @cached_property
    def req_cookies(self) -> SimpleCookie:
        cookie = SimpleCookie()
        cookie.load(self.req_headers.get("cookie", {}))
        return cookie

    @cached_property
    def type(self) -> ConnectionType:
        return (
            ConnectionType.WebSocket
            if self.scope.get("type") == "websocket"
            else ConnectionType.HTTP
        )

    @cached_property
    def method(self) -> str:
        return self.scope["method"]

    @cached_property
    def path(self) -> str:
        return self.scope["path"]

    @cached_property
    def query(self) -> MultiDict:
        return MultiDict(parse_qsl(unquote_plus(self.scope["query_string"].decode())))

    async def send(self, data: Union[bytes, str] = b"", finish: Optional[bool] = False):
        if self.finished:
            raise ValueError("No message can be sent when connection closed")
        if self.type == ConnectionType.HTTP:
            if isinstance(data, str):
                data = data.encode()
            await self._http_send(data, finish=finish)
        else:
            raise NotImplementedError()

    async def _http_send(self, data: bytes = b"", *, finish: bool = False):
        if not self.started:
            if finish:
                self.put_resp_header("content-length", str(len(data)))
            await self.start_resp()
        await self.asgi_send(
            {"type": "http.response.body", "body": data or b"", "more_body": True}
        )
        if finish:
            await self.finish()

    async def finish(self, close_code: Optional[int] = 1000):
        if self.type == ConnectionType.HTTP:
            if self.finished:
                raise ValueError("Connection already finished")
            if not self.started:
                self.resp_status_code = 204
                await self.start_resp()
            await self.asgi_send(
                {"type": "http.response.body", "body": b"", "more_body": False}
            )
        else:
            raise NotImplementedError()
            # await self.asgi_send({"type": "websocket.close", "code": close_code})
        self.finished = True

    async def start_resp(self):
        if self.started:
            raise ValueError("resp already started")
        if not self.resp_status_code:
            self.resp_status_code = 200
        headers = [
            [k.encode("ascii"), v.encode("ascii")] for k, v in self.resp_headers.items()
        ]
        for value in self.resp_cookies.values():
            headers.append([b"Set-Cookie", value.OutputString().encode("ascii")])
        await self.asgi_send(
            {
                "type": "http.response.start",
                "status": self.resp_status_code,
                "headers": headers,
            }
        )
        self.started = True

    async def body_iter(self):
        if not self.type == ConnectionType.HTTP:
            raise ValueError("connection type is not HTTP")
        if self.http_received_body_length > 0 and self.http_has_more_body:
            raise ValueError("body iter is already started and is not finished")
        if self.http_received_body_length > 0 and not self.http_has_more_body:
            yield self.http_body
        req_body_length = (
            int(self.req_headers.get("content-length", "0"))
            if not self.req_headers.get("transfer-encoding") == "chunked"
            else None
        )
        while self.http_has_more_body:
            if req_body_length and self.http_received_body_length > req_body_length:
                raise ValueError("body is longer than declared")
            message = await self.asgi_receive()
            message_type = message.get("type")
            if message.get("type") == "http.disconnect":
                raise ValueError("Disconnected")
            if message_type != "http.request":
                continue
            chunk = message.get("body", b"")
            if not isinstance(chunk, bytes):
                raise ValueError("Chunk is not bytes")
            self.http_body += chunk
            self.http_has_more_body = message.get("more_body", False) or False
            self.http_received_body_length += len(chunk)
            yield chunk

    async def body(self):
        return b"".join([chunks async for chunks in self.body_iter()])

    def put_resp_header(self, key, value):
        self.resp_headers.add(key, value)

    def put_resp_cookie(self, key, value):
        self.resp_cookies[key] = value

I hope this code is easy to read. All it does is providing us with an interface that makes it easier to access info about the request, read request body, make sure body length is not larger than declared, and send the response back to the client. It also provides some safeguards to ensure messages of the right type is sent in the right state. For example, it ensures that http.response.start messages are always sent before http.response.body , and no more messages are sent after connection closed. Most of the heavy lifting in parsing header, cookie, and query is done by werkzeug and other Python built-in superheroes.

Let’s make a simple ASGI application with the Connection class:

# example1.py
from aaf.connection import Connection
async def app(scope, receive, send):
    conn = Connection(scope, receive=receive, send=send)
    name = conn.query.get("name")
    await conn.send("Hello, " + (name or "world"), finish=True)

We can run this example by executing uvicorn example_1:app . Requesting /?name=foo should correctly return Hello, foo .

We now have an ASGI application that’s shorter in code and does more (accessing HTTP query). That’s one step closer to what we want!

HTTP Responses

It’s usually not required to have fine control over when to send what in an HTTP request-response cycle, returning a response that knows how to set headers and send the body is more convenient and familiar. To do that, we can write a simple HttpResponse helper class.

A JsonResponse is also added while we are at it.

# response.py
import json
from typing import Union, Optional, Mapping, Any

from .connection import Connection


class HttpResponse:
    def __init__(
        self,
        body: Optional[Union[bytes, str]] = b"",
        connection: Optional[Connection] = None,
        *,
        status_code: int = 200,
        headers: Optional[Mapping[str, str]] = None
    ):
        self.body = body
        self.connection = connection
        self.status_code = status_code
        self.headers = headers

    def __await__(self):
        if not self.connection:
            raise ValueError("No connection")
        self.connection.resp_status_code = self.status_code
        if self.headers:
            for k, v in self.headers.items():
                self.connection.put_resp_header(k, v)
        return self.connection.send(self.body, finish=True).__await__()


class JsonResponse(HttpResponse):
    def __init__(
        self, data: Any, connection: Optional[Connection] = None, *args, **kwargs
    ):
        body = json.dumps(data)
        headers = kwargs.get("headers")
        if headers is None:
            headers = {}
        headers["content-type"] = "application/json"
        super().__init__(body, connection, *args, **kwargs)

There’s not much going on in the HttpResponse class. All it does is providing a familiar interface allowing us to pass in a response body, optional headers, optional status code, and calls the underneath methods in Connection class for us. In the example of JsonResponse class, it also sets the content-type header.

Let’s write another ASGI application to test it:

async def app(scope, receive, send):
    conn = Connection(scope, receive=receive, send=send)
    await JsonResponse(conn.query.to_dict(flat=False), conn)

This application should return all query parameters in the form of a JSON object when visited.

Great, the application is even shorter!

You might have noticed that this is not exactly how it’s used in thesection, where we can just return the response object instead of await on it. This is because this example is a plain ASGI application and the one in the originalsection is in the context of a Router , who calls the await for us. The connection argument is allowed to be None in the constructor for the same reason.

Routing

Router dispatches requests based on requested url and HTTP method to different handlers. Most router implementations also parses parameters in urls. For example, if we define a router

@router.route('/a/<param_a>/<param_b>')
async def handler(connection, param_a, param_b):
	...

and then tell the router to match the URL /a/foo/bar , it should give us the handler function as well as the parameters param_a and params_b .

This is indeed not easy but luckily, werkzeug comes with a routing module with does exactly this and even more, such as automatically redirect in the case of missing trailing slash. With its help, we can implement our routing module in around 60 lines of code.

# routing.py
import functools
from typing import Callable, Iterable, Optional

from werkzeug.routing import Map, MethodNotAllowed, NotFound, RequestRedirect, Rule

from .connection import Connection
from .response import HttpResponse


class Router:
    def __init__(self):
        super().__init__()
        self.url_map = Map()
        self.endpoint_to_handler = {}

    def route(self, rule, methods=None, name=None):
        methods = set(methods) if methods is not None else None
        if methods and not "OPTIONS" in methods:
            methods.add("OPTIONS")

        def decorator(name: Optional[str], handler: Callable):
            self.add_route(
                rule_string=rule, handler=handler, methods=methods, name=name
            )
            return handler

        return functools.partial(decorator, name)

    def add_route(
        self,
        *,
        rule_string: str,
        handler: Callable,
        name: Optional[str] = None,
        methods: Optional[Iterable[str]] = None,
    ):
        if not name:
            name = handler.__name__
        existing_handler = self.endpoint_to_handler.get(name)
        if existing_handler and existing_handler is not handler:
            raise ValueError("Duplicated route name: %s" % (name))
        self.url_map.add(Rule(rule_string, endpoint=name, methods=methods))
        self.endpoint_to_handler[name] = handler

    def get_url_binding_for_connection(self, connection: Connection):
        scope = connection.scope
        return self.url_map.bind(
            connection.req_headers.get("host"),
            path_info=scope.get("path"),
            script_name=scope.get("root_path") or None,
            url_scheme=scope.get("scheme"),
            query_args=scope.get("query_string", b""),
        )

    async def __call__(self, connection: Connection):
        try:
            rule, args = self.get_url_binding_for_connection(connection).match(
                return_rule=True, method=connection.scope.get("method")
            )
        except RequestRedirect as e:
            connection.resp_status_code = 302
            connection.put_resp_header("location", e.new_url)
            return await connection.send(f"redirecting to: {e.new_url}", finish=True)
        except MethodNotAllowed:
            connection.resp_status_code = 405
            return await connection.send(b"", finish=True)
        except NotFound:
            pass
        else:
            handler = self.endpoint_to_handler[rule.endpoint]
            res = await handler(connection, **args)
            if isinstance(res, HttpResponse):
                res.connection = connection
                await res

In the __call__ method, I’m checking if the returned type from the handler is HttpResponsoe , if it is indeed HttpResponsoe , the router can await the response and the response takes care of sending headers and body.

More details on werkzeug.routing

Use this simple app to test it:

from aaf.connection import Connection
from aaf.response import JsonResponse
from aaf.routing import Router

router = Router()

@router.route("/hello/<name>")
async def hello(connection, name):
    return JsonResponse({'hello': name})


async def app(scope, receive, send):
    conn = Connection(scope, receive=receive, send=send)
    await router(conn)

Visiting /hello/world should return {"hello": "world"} .

The application gets longer, but hopefully, it’s clear that with more routes to handle a router makes it a lot easier to handle.

Router to ASGI application

Now, routers are great, but they are still not ASGI applications. Even though we can always write an ASGI application by hand, make a Connection out of it, and then call a router with the connection, it just doesn’t feel like an ASGI framework but more like an ASGI toolbox.

So, let’s write a helper function that turns a list of routers into an ASGI application. The idea of receiving a list of routers instead of just one is to allow an unhandled path to failover to the next router.

# __init__.py
from typing import List, Callable
from .connection import Connection


def aaf(routers: List[Callable]):
    async def asgi_app(scope, receive, send):
        conn = Connection(scope, send=send, receive=receive)
        for router in routers:
            await router(conn)
            if conn.finished:
                return
        if conn.started:
            await conn.finish()
        else:
            conn.resp_status_code = 404
            await conn.send("Not found", finish=True)

    return asgi_app

It also includes a default 404 response.

With the help of this new function, the previous example can be rewritten as:

from aaf.connection import Connection
from aaf.response import JsonResponse
from aaf.routing import Router
from aaf import aaf

router = Router()

@router.route("/hello/<name>")
async def hello(connection, name):
    return JsonResponse({'hello': name})

app = aaf([router])

In fact the example defined in thesection should also work now. Give it a try!

Next steps

An infinite number of things can be added to this framework. On the top of my head, there are

Connection

I will write some of the things in the near future. And I wish this post could give readers enough information on where to start implementing them on their own.


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

我看电商2(双色)

我看电商2(双色)

黄若 / 电子工业出版社 / 2016-6 / 39.00元

《我看电商2》是行业畅销书《我看电商》的续集。 《我看电商》自出版以来,连续印刷14 次,受到业界人士和广大读者的高度好评。《我看电商2》承续作者一贯的风格,以行业观察、经验分享为出发点,重点分析了过去一年中国电商界的最新动态与趋势,包括双11点评、京东关闭拍拍、上市公司私有化等。 电子商务是我国近年来发展最快的新兴行业之一,作者作为这个行业的长老级领军人物,善于思考,长于实操。《我看......一起来看看 《我看电商2(双色)》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器