在上一篇教程中,我们探讨了使用数据库的基本CRUD操作。这一篇,我们将深入了解如何在FastAPI中实现异步数据库操作。使用异步机制的优势在于,它能够在IO密集型任务(如数据库访问)中提高性能,使应用更具响应性。
环境准备 在开始之前,请确保已经安装了以下库:
1 pip install fastapi databases asyncpg uvicorn
databases
是一个支持异步操作的数据库库。
asyncpg
是一个异步PostgreSQL数据库适配器。
uvicorn
是一个用于运行FastAPI的ASGI服务器。
创建数据库模型 我们将使用SQLAlchemy定义数据模型,然后通过asyncpg连接到PostgreSQL。以下是一个简单的用户模型示例:
1 2 3 4 5 6 7 8 9 10 11 from sqlalchemy import Column, Integer, String, create_enginefrom sqlalchemy.ext.declarative import declarative_baseBase = declarative_base() class User (Base ): __tablename__ = "users" id = Column(Integer, primary_key=True , index=True ) name = Column(String, index=True ) email = Column(String, unique=True , index=True )
配置异步数据库连接 接下来,我们需要配置异步数据库连接。使用databases
库,我们可以轻松地定义连接字符串,并且支持异步操作。
1 2 3 4 from databases import DatabaseDATABASE_URL = "postgresql+asyncpg://username:password@localhost/dbname" database = Database(DATABASE_URL)
请根据您的数据库配置替换 username
、password
和 dbname
。
创建和破坏数据库连接 在FastAPI应用程序中,我们需要在启动时连接到数据库,并在关闭时断开连接。可以使用 FastAPI
的事件处理程序实现:
1 2 3 4 5 6 7 8 9 10 11 from fastapi import FastAPIapp = FastAPI() @app.on_event("startup" ) async def startup (): await database.connect() @app.on_event("shutdown" ) async def shutdown (): await database.disconnect()
实现异步CRUD操作 现在我们已经连接了数据库,让我们来实现异步的CRUD操作。我们将创建一些异步函数来处理用户的插入、查询、更新和删除。
创建用户 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from pydantic import BaseModelfrom fastapi import HTTPExceptionclass UserIn (BaseModel ): name: str email: str class UserOut (BaseModel ): id : int name: str email: str @app.post("/users/" , response_model=UserOut ) async def create_user (user: UserIn ): query = User.__table__.insert().values(name=user.name, email=user.email) last_record_id = await database.execute(query) return {**user.dict (), "id" : last_record_id}
查询所有用户 1 2 3 4 @app.get("/users/" , response_model=list [UserOut] ) async def read_users (): query = User.__table__.select() return await database.fetch_all(query)
查询单个用户 1 2 3 4 5 6 7 @app.get("/users/{user_id}" , response_model=UserOut ) async def read_user (user_id: int ): query = User.__table__.select().where(User.id == user_id) user = await database.fetch_one(query) if user is None : raise HTTPException(status_code=404 , detail="User not found" ) return user
更新用户 1 2 3 4 5 @app.put("/users/{user_id}" , response_model=UserOut ) async def update_user (user_id: int , user: UserIn ): query = User.__table__.update().where(User.id == user_id).values(name=user.name, email=user.email) await database.execute(query) return {**user.dict (), "id" : user_id}
删除用户 1 2 3 4 5 @app.delete("/users/{user_id}" , response_model=dict ) async def delete_user (user_id: int ): query = User.__table__.delete().where(User.id == user_id) await database.execute(query) return {"message" : "User deleted successfully" }
测试异步数据库操作 为了确保我们实现的异步数据库操作有效,建议使用FastAPI的测试客户端进行测试。在下一篇教程中,我们将讨论如何编写单元测试,以验证我们创建的API是否按预期工作。
小结 在本篇教程中,我们讲解了如何在FastAPI中实现异步数据库操作。通过使用databases
库,我们能够高效地管理数据库连接并执行异步的CRUD操作。接下来,我们将进入测试阶段,确保我们的API在生产环境中表现良好,这将是我们系列教程的一部分。
后续我们将探讨如何编写单元测试,以确保我们的代码的稳健性。希望这篇教程对您理解FastAPI的异步数据库操作有所帮助!