-
-
Notifications
You must be signed in to change notification settings - Fork 958
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lifespan provides a poor user experience #2845
Comments
IMHO lifespans in general are a mistake and we should be starting servers by hand. It's not a hard recipe to get right: import anyio
from fastapi import FastAPI
from psycopg_pool import AsyncConnectionPool
from asapi import FromPath, Injected, serve, bind
app = FastAPI()
@app.get("/hello/{name}")
async def hello(name: FromPath[str], pool: Injected[AsyncConnectionPool]) -> str:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT '¡Hola ' || %(name)s || '!'", {"name": name})
res = await cur.fetchone()
assert res is not None
return res[0]
async def main() -> None:
async with AsyncConnectionPool(
"postgres://postgres:postgres@localhost:5432/postgres"
) as pool:
bind(app, AsyncConnectionPool, pool)
await serve(app, 8000)
if __name__ == "__main__":
anyio.run(main) |
regarding point 2I think the on_event api provides a false sense of convenience, consider a realistic example: @contextlib.asynccontextmanager
async def lifespan():
async with some_db.create_pool() as db:
async with anyio.create_task_group() as short_term_tg:
async with anyio.create_task_group() as long_term_tg:
await long_term_tg.start(setup_db_and_poll_db, db)
yield {
"db": db,
"short_term_tg": short_term_tg,
"long_term_tg": long_term_tg,
}
long_term_tg.cancel_scope.cancel() this is very difficult to achieve with the old on_event API, probably it's do-able using AsyncExitStack. Another issue is that, the startup hooks encourage forgetting the shutdown step, especially for database connections.
|
regarding point 1I think we could probably get the types working though, we might need to make Starlette generic over the lifespan param Here's a draft from typing import *
import collections.abc
import contextlib
type Coro[T] = collections.abc.Coroutine[Any, Any, T]
class Request[Lifespan]:
def __init__(self, app: Starlette[Lifespan]) -> None:
self.app = app
class Response:
pass
class Starlette[Lifespan]:
def __init__(
self,
lifespan: Callable[[Self], contextlib.AbstractAsyncContextManager[Lifespan]],
):
self._lifespan = (lifespan,)
async def run(self: Self) -> None:
async with self._lifespan[0](self) as state:
self.state = state
def route(
self, path: str
) -> Callable[[Callable[[Request[Lifespan]], Coro[Response]]], None]:
def decorator(fn: Callable[[Request[Lifespan]], Coro[Response]]) -> None:
pass
return decorator
class DB:
async def query(self, code: str) -> list[str]:
return ["hello"]
async def __aenter__(self) -> Self:
return self
async def __aexit__(self, *exc_info: object) -> None:
return None
@contextlib.asynccontextmanager
async def lifespan(app: Starlette[DB]) -> AsyncGenerator[DB]:
async with DB() as db:
yield db
app = Starlette(lifespan=lifespan)
@app.route("/")
async def foo(request: Request[DB]) -> Response:
results = await request.app.state.query("SELECT world")
reveal_type(results)
return Response() |
And make the Request generic and propagate it all the way down... Ideal but not sure if practical. |
I'll talk from the point of view of FastAPI, some of it might or might not make sense for Starlette. I was thinking I would build the ideas I have on top of the current Starlette interface, but maybe some parts of that can be done on the Starlette side. Anyway, sharing in case some of that makes sense. I'm still not sure about all the parts (e.g. the state types), but some others I have an idea of what I want. Decorator, multiple lifespansI want to have a decorator version, this would look a bit more similar to the from fastapi import FastAPI
app = FastAPI()
@app.lifespan() # this would handle creating the async context manager underneath
async def some_async_lifespan(): # maybe it could take some params, but not sure, and not sure what for
# some startup code
# not sure the right way to save state, maybe yield it the same way, or maybe store it another way
yield # maybe yield a value, maybe the state, but not sure
# exit code
@app.lifespan()
async def second_lifespan():
# the startup code would be run in order of definition, so the function above would be run first, yield, then this function would be run
yield
# exit code would run in reverse order, the same as with contextmanagers, e.g. an exit stack, so, this exit code would run first, and then the exit code above
# the same as with regular router / path operation functions, it would handle regular def / sync functions
# maybe run them on a thread worker, not sure how useful as nothing else can be running concurrently at this point, maybe only to allow them to send async code to be run on the main thread via anyio.from_thread or equivalent
@app.lifespan()
def sync_lifespan():
yield StateI still haven't thought thoroughly about how to handle state, I want each function to be able to use it, maybe they would take state in input params and yield the new state, or maybe modify in place, not sure. Given FastAPI is based on types, maybe it could make sense to request the lifespan state as a param with a type, but not really sure how to do it. Maybe similar to Lifespans also have some similarities with dependencies, start code, yield, exit code, so maybe I could make some of the same dependencies be "lifespan dependencies". But not sure if the fact that they are quite different underneath could cause problems with the very similar interface... I haven't thought that much about this yet. Manually handling serverThere's an aspect I find interesting from this, I want to allow doing some tricks that would benefit from having some type of external task group or something (e.g. would be useful for batch processing). But I don't want to tell people to run things from their own code, I want to have a sort of default way to run things and start them from the CLI. So, in FastAPI, I wouldn't go for the server started form user code wrapped in user async stuff... but I like the idea of being able to share and use top level resources in some way. |
@tiangolo is the concern complexity? What I find enticing is that you don't have to learn much and then you understand how things actually work. Using the Uvicorn CLI you need to figure out how to configure logging and a bunch of other stuff that frankly is not super well documented or is easy to figure out. And even then all you did is learn the quirks of one library / framework, not how things actually work. |
Multiple lifespans
For fastapi I'd like to see lifespan scoped Dependencies like in expresso There's a PR to do this already fastapi/fastapi#12529 currently the scope is specified at the route rather than at the dependency which I think is incorrect.
I don't think that's an issue, pytest has session and fixture scope and it's fairly comprehensible. Saying that I do see beginners getting mixed up and sharing state across every test because they picked the wrong scope Manually handling serverI like being able to run from the cli, passing a string reference to a module exporting an app factory and allowing reloads and managing the eventloop for me something like this might be nicer than lifespan in that case: encode/uvicorn#1151 Also there's a usecase for managing the app synchronously from the main thread, eg uvicorn worker processes or multiple worker threads which should be more reasonable in 3.14t |
+1 for encode/uvicorn#1151 being better than lifespans |
Agreed 🤔 At the same time, I would like to be able to tell people to run a single command that would work by default without much customization, and that would have sane defaults for most use cases. But if I tell people to run everything, including the server, from inside of their own code, then each app could be different and a single default command wouldn't work. Although it's true that for advanced use cases when people want to tweak a bunch of the internals it probably makes more sense to do it from their own code.
Interesting. I'll take a look. And yep, I've been thinking about something similar for a while.
Fair point, that should be fine. I should probably have this. Although I have to figure out the right way/scopes, this would end up being related to fastapi/fastapi#11143 (dependencies with yield closing before or after the response is sent).
This looks interesting as well. I wonder if there's a way to simplify the exposed usage API, or how to explain it / document it. |
It's just a vague idea, but I haven't figured out how to work with the ASGI Server yet. Maybe we can use contextvars and not rely on app.state. This would also make it easier for FastAPI/Starlette to support lifespan at the route level. |
@adriangb Can you explain to me what this example has to do with keeping some global state? |
There's a lot more detail here: https://github.com/adriangb/asapi |
I mean, can you explain how to use your pattern/library to keep objects with a 'static' storage duration, the whole lifespan, accessible from all endpoints (global)? |
Sorry I'm not trying to not answer, I just didn't want to make this thread about my library and such. But I'll respond since it's relevant to the topic at hand. I realized the examples were not great, I updated the last example to be clearer: from __future__ import annotations
import logging
from typing import Any
import anyio
from fastapi import FastAPI, APIRouter
from psycopg_pool import AsyncConnectionPool
from asapi import FromPath, Injected, serve, bind
router = APIRouter()
@router.get("/hello/{name}")
async def hello(name: FromPath[str], pool: Injected[AsyncConnectionPool]) -> str:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT '¡Hola ' || %(name)s || '!'", {"name": name})
res = await cur.fetchone()
assert res is not None
return res[0]
def create_app(pool: AsyncConnectionPool[Any]) -> FastAPI:
app = FastAPI()
bind(app, AsyncConnectionPool, pool)
app.include_router(router)
return app
async def main() -> None:
logging.basicConfig(level=logging.INFO)
async with AsyncConnectionPool(
"postgres://postgres:postgres@localhost:54320/postgres"
) as pool:
app = create_app(pool)
await serve(app, 9000)
if __name__ == "__main__":
anyio.run(main) The point is that you can now call The global state is avoided because you initialize async dependencies in the You also avoid nested Personally this is all I use, I do not use Gunicorn or multi process forking, I prefer to let another process manager (like CloudRun, Kubernetes, etc.) manage the processes for me. But if you do need to use Gunicorn or maybe in the future want to use free threaded Python you can combine this pattern with lazgi: from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
from fastapi import FastAPI, APIRouter
from psycopg_pool import AsyncConnectionPool
from asapi import FromPath, Injected, bind, validate_injections
from lazgi import LazyApp
router = APIRouter()
@router.get("/hello/{name}")
async def hello(name: FromPath[str], pool: Injected[AsyncConnectionPool]) -> str:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT '¡Hola ' || %(name)s || '!'", {"name": name})
res = await cur.fetchone()
assert res is not None
return res[0]
def create_app(pool: AsyncConnectionPool[Any]) -> FastAPI:
app = FastAPI()
bind(app, AsyncConnectionPool, pool)
app.include_router(router)
validate_injections(app)
return app
@asynccontextmanager
async def app_factory() -> AsyncIterator[FastAPI]:
async with AsyncConnectionPool(
"postgres://postgres:postgres@localhost:54320/postgres"
) as pool:
yield create_app(pool)
app = LazyApp(app_factory) |
Great, we will use this pattern. |
What I'm using right now, I'm typing the state through a dependency fetching it from request.state: from collections.abc import AsyncIterator
from fastapi import Request
from fastapi.datastructures import State
from sqlalchemy.ext.asyncio import AsyncConnection
from src.database import FlightClient
from typing import cast
class AppState(State):
start_time: float
flight_client: FlightClient
async_connection: AsyncConnection
async def get_state(request: Request) -> AsyncIterator[AppState]:
yield cast(request.state, AppState)
async def hello_world(state: Annotated[AppState, Depends(get_state)]):
return state.start_time |
A small suggestion using type aliases from typing import Annotated
from fastapi.datastructures import State as FastapiState
class AppState(FastapiState): ...
State = Annotated[AppState, Depends(get_state)]
async def hello_world(state: State): # endpoints are now less verbose
... |
I'm not sure what's the solution here, but something needs to be done.
Problems
The
state
of lifespan is not type enforced.For more details, see:
1.1. https://www.starlette.io/lifespan/#lifespan-state
1.2. How to convert `TypedDict` to a `dataclass_transform` behavior? python/typing#1457
1.3. Make request.state generic #2562
The current lifespan is not beginner friendly.
For a beginner, we introduce two new elements here:
@asynccontextmanager
, andyield
.It's far intuitive to write the following than the current lifespan way:
See Further develop startup and shutdown events fastapi/fastapi#617 (comment).
You could create multiple
on_startup
andon_shutdown
, now thelifespan
is a single async context manager.I wish we could focus on the point 2 above, I'm only mentioning the others as I was thinking about them.
I understand a lot of effort was done to provide the most correct, and exception/yield safety solution, but... We need an API that is more user friendly.
I've created this for discussion purposes. cc @graingert @adriangb @tiangolo @abersheeran @sm-Fifteen
The text was updated successfully, but these errors were encountered: