内容简介:The first time I usedASGI specification is now at version 3 at the time of writing and both ASGI and Channels became part of Django Software Foundation. Compared to the draft version, it has matured a lot with added lifecycle calls and better application f
Intro
The first time I used ASGI ( Asynchronous Server Gateway Interface ) was through Channels 1.0 when ASGI spec was still a draft. It was my first interview project which helped me get my current job. It felt magical at that time how easy it is to add WebSocket functionality to my Django app and handles authentication and other Django related things for me seamlessly.
ASGI specification is now at version 3 at the time of writing and both ASGI and Channels became part of Django Software Foundation. Compared to the draft version, it has matured a lot with added lifecycle calls and better application format, etc. Most excitingly, a healthy and fast-growing community is forming and we are seeing more and more ASGI servers running in production environments. At my company, we are serving a few million requests per day through ASGI running on Daphne , Netflix’s Dispatch is based on FastAPI , a popular ASGI web application framework, and apparently, Microsoft is using it too.
I would humbly advise anyone building web services in Python to learn about ASGI. And the best way to learn something is to built things with it, so in this blog post, I’ll walk through the steps to build a micro web application framework that speaks ASGI. I hope it can help explain how ASGI works.
Settings the goal
Before writing the first line of code, we need to have a basic understanding of what ASGI is and what we are building towards.
How ASGI works
Here’s a simple diagram showing how ASGI works at a high level.
graph TD A[Client] -->|HTTP, WebSocket, ...| B(ASGI Server) B --> |scope, send, receive| C(ASGI application)
To put it in simple words, A browser(client), establishes a connection to ASGI server with a certain type of request (HTTP or WebSocket), the ASGI server then calls ASGI application with information about the connection, encapsulated in a python dictionary called scope
, and two callbacks, named send
and receive
, that the application can use to send and receive messages between server and client.
Here’s an example HTTP request scope
{ "type": "http", "http_version": "1.1", "server": ("127.0.0.1", 8000), "client": ("127.0.0.1", 60457), "scheme": "http", "method": "GET", "root_path": "", "path": "/hello/a", "raw_path": b"/hello/a", "query_string": b"", "headers": [ (b"host", b"localhost:8000"), (b"connection", b"keep-alive"), ( b"user-agent", b"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36", ), ( b"accept", b"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", ), (b"accept-encoding", b"gzip, deflate, br"), (b"accept-language", b"en-US,en;q=0.9"), ( b"cookie", b'csrftoken=dDA2IAPrvgPc7hkyBSyctxDk78KmhHAzUqR0LUpjXI3Xgki0QrGEWazE3RGZuLGl', ), ], }
You might notice that scope
is not too different from a WSGI environ
. In fact, ASGI interface is very similar to WSGI interface, but instead of getting a environ
and start_response
to send headers and using the return value of WSGI application as the response body, ASGI interfaces with the connection and allows us to receive and send messages multiple times during the lifecycle of the connection asynchronously
until the connection is closed. This allows a nice interface for both WebSocket and HTTP.
It’s also totally possible to wrap a WSGI application inside an ASGI application, just prepare a WSGI environ
and start_response
based on scope
, receive
, and send
then call the WSGI application and it would work. If you delegate that call into a thread pool or something similar, you just made your WSGI application asynchronous. This is roughly how Channels wraps around Django.
Define ASGI framework
When I say ASGI framework I refer it as a framework that makes building ASGI application easier and this does not include the ASGI server part. I’m mentioning this because some of the earlier Python asynchronous web frameworks have their own server implementation that also takes over tasks such as parsing HTTP requests, handles network connections, etc. We are not doing those in ASGI web framework. As a spiritual successor to WSGI, where web servers, such as Gunicorn and uwsgi, and web frameworks, such as Flask and Django, are separated, ASGI has this separation too.
So, what does an ASGI application look like?
ASGI Hello World
A simple ASGI hello world application can be written as:
async def application(scope, receive, send): name = scope["path"].split("/", 1)[-1] or "world" await send( { "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"],], } ) await send( { "type": "http.response.body", "body": f"Hello, {name}!".encode(), "more_body": False, } )
http.response.start
starts an HTTP response sending status code and response headers. In this example, it responds with the 200 OK status code and has content-type
set to text/plain
in the headers. http.response.body
sends the response body, the more_body
key tells the server if the response is finished. ASGI server might use this to know if a connection should be closed or automatically decide between a content-length
header or a chunked encoding.
We can run the application with uvicorn :
uvicorn asgi-hello:application
And you should be able to visit http://localhost:8000/
and get Hello, world
.Visiting http://localhost:8000/tom
would get you Hello, tom
.
By the way, uvicorn is pretty fast, a simple benchmark with wrk -d10s http://localhost:8000/hi
on a 2018 lowest spec MacBook Air yields Requests/sec: 27857.87
.
Although this approach works with a simple hello world example, it’s not exactly convenient to write a more complex application this way. For one, it doesn’t do routing, if you want to respond differently for different paths, you’ll probably end up with a huge if ... else if ... else
clause. Secondly, having to write the ASGI message every time in the form of a python dict is quite arduous. Third, in a complex application, it gets harder to track the status of the connection, such as is the response started, is the response ended, should I start the response here, etc.
Goal
With the new framework, I hope to be able to write an ASGI application like this:
import asyncio from aaf import aaf # Another ASGI framework from aaf.routing import Router from aaf.response import HttpResponse router = Router() @router.route('/') @router.route('/<name>') async def hello(connection, name='world'): return HttpResponse(f"Hello, {name}") @router.route('/count') @router.route('/count/<int:number>') async def count(connection, number=10): for i in range(number): await connection.send(f'count {i}\n', finish=False) await asyncio.sleep(1) await connection.send('', finish=True) @router.route('/echo') async def echo(connection): body = await connection.body() await connection.send(body, finish=True) app = aaf([router])
I hope this snippet of how I want the framework to look like is self-explanatory. But here are some of the key things I want to achieve:
- It should be able to handle HTTP response declaratively and imperatively.
- It should support Flask style routing with parameter parsing.
Building the framework
Connection class
The Connection
class will represent an ASGI HTTP or WebSocket connection. It’s a class that encapsulates the three basic elements in ASGI, namely scope
, send
and receive
, and expose some convenient methods and properties so that users don’t need to verbosely write out all the ASGI messages and parse everything, such as cookies and headers, from scope
. But it should allow users to access the original scope
, send
and receive
when they want to, so that the composability of ASGI applications is maintained. For example, it should allow user to delegate certain connection
s to another ASGI application by calling another_asgi_app(connection.scope, connectionn.asgi_send, connection.asgi_receive)
.
Here’s a simple implementation of the Connection
class.
from enum import Enum from functools import cached_property from http.cookies import SimpleCookie from typing import Any, Awaitable, Callable, Optional, Union from urllib.parse import parse_qsl, unquote_plus from werkzeug.datastructures import Headers, MultiDict CoroutineFunction = Callable[[Any], Awaitable] class ConnectionType(Enum): HTTP = "HTTP" WebSocket = "WebSocket" class Connection: def __init__( self, scope: dict, *, send: CoroutineFunction, receive: CoroutineFunction ): self.scope = scope self.asgi_send = send self.asgi_receive = receive self.started = False self.finished = False self.resp_headers = Headers() self.resp_cookies: SimpleCookie = SimpleCookie() self.resp_status_code: Optional[int] = None self.http_body = b"" self.http_has_more_body = True self.http_received_body_length = 0 @cached_property def req_headers(self) -> Headers: headers = Headers() for (k, v) in self.scope["headers"]: headers.add(k.decode("ascii"), v.decode("ascii")) return headers @cached_property def req_cookies(self) -> SimpleCookie: cookie = SimpleCookie() cookie.load(self.req_headers.get("cookie", {})) return cookie @cached_property def type(self) -> ConnectionType: return ( ConnectionType.WebSocket if self.scope.get("type") == "websocket" else ConnectionType.HTTP ) @cached_property def method(self) -> str: return self.scope["method"] @cached_property def path(self) -> str: return self.scope["path"] @cached_property def query(self) -> MultiDict: return MultiDict(parse_qsl(unquote_plus(self.scope["query_string"].decode()))) async def send(self, data: Union[bytes, str] = b"", finish: Optional[bool] = False): if self.finished: raise ValueError("No message can be sent when connection closed") if self.type == ConnectionType.HTTP: if isinstance(data, str): data = data.encode() await self._http_send(data, finish=finish) else: raise NotImplementedError() async def _http_send(self, data: bytes = b"", *, finish: bool = False): if not self.started: if finish: self.put_resp_header("content-length", str(len(data))) await self.start_resp() await self.asgi_send( {"type": "http.response.body", "body": data or b"", "more_body": True} ) if finish: await self.finish() async def finish(self, close_code: Optional[int] = 1000): if self.type == ConnectionType.HTTP: if self.finished: raise ValueError("Connection already finished") if not self.started: self.resp_status_code = 204 await self.start_resp() await self.asgi_send( {"type": "http.response.body", "body": b"", "more_body": False} ) else: raise NotImplementedError() # await self.asgi_send({"type": "websocket.close", "code": close_code}) self.finished = True async def start_resp(self): if self.started: raise ValueError("resp already started") if not self.resp_status_code: self.resp_status_code = 200 headers = [ [k.encode("ascii"), v.encode("ascii")] for k, v in self.resp_headers.items() ] for value in self.resp_cookies.values(): headers.append([b"Set-Cookie", value.OutputString().encode("ascii")]) await self.asgi_send( { "type": "http.response.start", "status": self.resp_status_code, "headers": headers, } ) self.started = True async def body_iter(self): if not self.type == ConnectionType.HTTP: raise ValueError("connection type is not HTTP") if self.http_received_body_length > 0 and self.http_has_more_body: raise ValueError("body iter is already started and is not finished") if self.http_received_body_length > 0 and not self.http_has_more_body: yield self.http_body req_body_length = ( int(self.req_headers.get("content-length", "0")) if not self.req_headers.get("transfer-encoding") == "chunked" else None ) while self.http_has_more_body: if req_body_length and self.http_received_body_length > req_body_length: raise ValueError("body is longer than declared") message = await self.asgi_receive() message_type = message.get("type") if message.get("type") == "http.disconnect": raise ValueError("Disconnected") if message_type != "http.request": continue chunk = message.get("body", b"") if not isinstance(chunk, bytes): raise ValueError("Chunk is not bytes") self.http_body += chunk self.http_has_more_body = message.get("more_body", False) or False self.http_received_body_length += len(chunk) yield chunk async def body(self): return b"".join([chunks async for chunks in self.body_iter()]) def put_resp_header(self, key, value): self.resp_headers.add(key, value) def put_resp_cookie(self, key, value): self.resp_cookies[key] = value
I hope this code is easy to read. All it does is providing us with an interface that makes it easier to access info about the request, read request body, make sure body length is not larger than declared, and send the response back to the client. It also provides some safeguards to ensure messages of the right type is sent in the right state. For example, it ensures that http.response.start
messages are always sent before http.response.body
, and no more messages are sent after connection closed. Most of the heavy lifting in parsing header, cookie, and query is done by werkzeug
and other Python built-in superheroes.
Let’s make a simple ASGI application with the Connection
class:
# example1.py from aaf.connection import Connection async def app(scope, receive, send): conn = Connection(scope, receive=receive, send=send) name = conn.query.get("name") await conn.send("Hello, " + (name or "world"), finish=True)
We can run this example by executing uvicorn example_1:app
. Requesting /?name=foo
should correctly return Hello, foo
.
We now have an ASGI application that’s shorter in code and does more (accessing HTTP query). That’s one step closer to what we want!
HTTP Responses
It’s usually not required to have fine control over when to send what in an HTTP request-response cycle, returning a response that knows how to set headers and send the body is more convenient and familiar. To do that, we can write a simple HttpResponse
helper class.
A JsonResponse
is also added while we are at it.
# response.py import json from typing import Union, Optional, Mapping, Any from .connection import Connection class HttpResponse: def __init__( self, body: Optional[Union[bytes, str]] = b"", connection: Optional[Connection] = None, *, status_code: int = 200, headers: Optional[Mapping[str, str]] = None ): self.body = body self.connection = connection self.status_code = status_code self.headers = headers def __await__(self): if not self.connection: raise ValueError("No connection") self.connection.resp_status_code = self.status_code if self.headers: for k, v in self.headers.items(): self.connection.put_resp_header(k, v) return self.connection.send(self.body, finish=True).__await__() class JsonResponse(HttpResponse): def __init__( self, data: Any, connection: Optional[Connection] = None, *args, **kwargs ): body = json.dumps(data) headers = kwargs.get("headers") if headers is None: headers = {} headers["content-type"] = "application/json" super().__init__(body, connection, *args, **kwargs)
There’s not much going on in the HttpResponse
class. All it does is providing a familiar interface allowing us to pass in a response body, optional headers, optional status code, and calls the underneath methods in Connection
class for us. In the example of JsonResponse
class, it also sets the content-type
header.
Let’s write another ASGI application to test it:
async def app(scope, receive, send): conn = Connection(scope, receive=receive, send=send) await JsonResponse(conn.query.to_dict(flat=False), conn)
This application should return all query parameters in the form of a JSON object when visited.
Great, the application is even shorter!
You might have noticed that this is not exactly how it’s used in thesection, where we can just return
the response object instead of await
on it. This is because this example is a plain ASGI application and the one in the originalsection is in the context of a Router
, who calls the await
for us. The connection
argument is allowed to be None
in the constructor for the same reason.
Routing
Router dispatches requests based on requested url and HTTP method to different handlers. Most router implementations also parses parameters in urls. For example, if we define a router
@router.route('/a/<param_a>/<param_b>') async def handler(connection, param_a, param_b): ...
and then tell the router to match the URL /a/foo/bar
, it should give us the handler
function as well as the parameters param_a
and params_b
.
This is indeed not easy but luckily, werkzeug
comes with a routing
module with does exactly this and even more, such as automatically redirect in the case of missing trailing slash. With its help, we can implement our routing
module in around 60 lines of code.
# routing.py import functools from typing import Callable, Iterable, Optional from werkzeug.routing import Map, MethodNotAllowed, NotFound, RequestRedirect, Rule from .connection import Connection from .response import HttpResponse class Router: def __init__(self): super().__init__() self.url_map = Map() self.endpoint_to_handler = {} def route(self, rule, methods=None, name=None): methods = set(methods) if methods is not None else None if methods and not "OPTIONS" in methods: methods.add("OPTIONS") def decorator(name: Optional[str], handler: Callable): self.add_route( rule_string=rule, handler=handler, methods=methods, name=name ) return handler return functools.partial(decorator, name) def add_route( self, *, rule_string: str, handler: Callable, name: Optional[str] = None, methods: Optional[Iterable[str]] = None, ): if not name: name = handler.__name__ existing_handler = self.endpoint_to_handler.get(name) if existing_handler and existing_handler is not handler: raise ValueError("Duplicated route name: %s" % (name)) self.url_map.add(Rule(rule_string, endpoint=name, methods=methods)) self.endpoint_to_handler[name] = handler def get_url_binding_for_connection(self, connection: Connection): scope = connection.scope return self.url_map.bind( connection.req_headers.get("host"), path_info=scope.get("path"), script_name=scope.get("root_path") or None, url_scheme=scope.get("scheme"), query_args=scope.get("query_string", b""), ) async def __call__(self, connection: Connection): try: rule, args = self.get_url_binding_for_connection(connection).match( return_rule=True, method=connection.scope.get("method") ) except RequestRedirect as e: connection.resp_status_code = 302 connection.put_resp_header("location", e.new_url) return await connection.send(f"redirecting to: {e.new_url}", finish=True) except MethodNotAllowed: connection.resp_status_code = 405 return await connection.send(b"", finish=True) except NotFound: pass else: handler = self.endpoint_to_handler[rule.endpoint] res = await handler(connection, **args) if isinstance(res, HttpResponse): res.connection = connection await res
In the __call__
method, I’m checking if the returned type from the handler is HttpResponsoe
, if it is indeed HttpResponsoe
, the router can await
the response and the response takes care of sending headers and body.
More details on werkzeug.routing
Use this simple app to test it:
from aaf.connection import Connection from aaf.response import JsonResponse from aaf.routing import Router router = Router() @router.route("/hello/<name>") async def hello(connection, name): return JsonResponse({'hello': name}) async def app(scope, receive, send): conn = Connection(scope, receive=receive, send=send) await router(conn)
Visiting /hello/world
should return {"hello": "world"}
.
The application gets longer, but hopefully, it’s clear that with more routes to handle a router makes it a lot easier to handle.
Router to ASGI application
Now, routers are great, but they are still not ASGI applications. Even though we can always write an ASGI application by hand, make a Connection
out of it, and then call a router with the connection, it just doesn’t feel like an ASGI framework but more like an ASGI toolbox.
So, let’s write a helper function that turns a list of routers into an ASGI application. The idea of receiving a list of routers instead of just one is to allow an unhandled path to failover to the next router.
# __init__.py from typing import List, Callable from .connection import Connection def aaf(routers: List[Callable]): async def asgi_app(scope, receive, send): conn = Connection(scope, send=send, receive=receive) for router in routers: await router(conn) if conn.finished: return if conn.started: await conn.finish() else: conn.resp_status_code = 404 await conn.send("Not found", finish=True) return asgi_app
It also includes a default 404 response.
With the help of this new function, the previous example can be rewritten as:
from aaf.connection import Connection from aaf.response import JsonResponse from aaf.routing import Router from aaf import aaf router = Router() @router.route("/hello/<name>") async def hello(connection, name): return JsonResponse({'hello': name}) app = aaf([router])
In fact the example defined in thesection should also work now. Give it a try!
Next steps
An infinite number of things can be added to this framework. On the top of my head, there are
Connection
I will write some of the things in the near future. And I wish this post could give readers enough information on where to start implementing them on their own.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。