30 使用数据库的异步数据库操作

在上一篇教程中,我们探讨了使用数据库的基本CRUD操作。这一篇,我们将深入了解如何在FastAPI中实现异步数据库操作。使用异步机制的优势在于,它能够在IO密集型任务(如数据库访问)中提高性能,使应用更具响应性。

环境准备

在开始之前,请确保已经安装了以下库:

1
pip install fastapi databases asyncpg uvicorn
  • databases 是一个支持异步操作的数据库库。
  • asyncpg 是一个异步PostgreSQL数据库适配器。
  • uvicorn 是一个用于运行FastAPI的ASGI服务器。

创建数据库模型

我们将使用SQLAlchemy定义数据模型,然后通过asyncpg连接到PostgreSQL。以下是一个简单的用户模型示例:

1
2
3
4
5
6
7
8
9
10
11
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)

配置异步数据库连接

接下来,我们需要配置异步数据库连接。使用databases库,我们可以轻松地定义连接字符串,并且支持异步操作。

1
2
3
4
from databases import Database

DATABASE_URL = "postgresql+asyncpg://username:password@localhost/dbname"
database = Database(DATABASE_URL)

请根据您的数据库配置替换 usernamepassworddbname

创建和破坏数据库连接

在FastAPI应用程序中,我们需要在启动时连接到数据库,并在关闭时断开连接。可以使用 FastAPI 的事件处理程序实现:

1
2
3
4
5
6
7
8
9
10
11
from fastapi import FastAPI

app = FastAPI()

@app.on_event("startup")
async def startup():
await database.connect()

@app.on_event("shutdown")
async def shutdown():
await database.disconnect()

实现异步CRUD操作

现在我们已经连接了数据库,让我们来实现异步的CRUD操作。我们将创建一些异步函数来处理用户的插入、查询、更新和删除。

创建用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pydantic import BaseModel
from fastapi import HTTPException

class UserIn(BaseModel):
name: str
email: str

class UserOut(BaseModel):
id: int
name: str
email: str

@app.post("/users/", response_model=UserOut)
async def create_user(user: UserIn):
query = User.__table__.insert().values(name=user.name, email=user.email)
last_record_id = await database.execute(query)
return {**user.dict(), "id": last_record_id}

查询所有用户

1
2
3
4
@app.get("/users/", response_model=list[UserOut])
async def read_users():
query = User.__table__.select()
return await database.fetch_all(query)

查询单个用户

1
2
3
4
5
6
7
@app.get("/users/{user_id}", response_model=UserOut)
async def read_user(user_id: int):
query = User.__table__.select().where(User.id == user_id)
user = await database.fetch_one(query)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user

更新用户

1
2
3
4
5
@app.put("/users/{user_id}", response_model=UserOut)
async def update_user(user_id: int, user: UserIn):
query = User.__table__.update().where(User.id == user_id).values(name=user.name, email=user.email)
await database.execute(query)
return {**user.dict(), "id": user_id}

删除用户

1
2
3
4
5
@app.delete("/users/{user_id}", response_model=dict)
async def delete_user(user_id: int):
query = User.__table__.delete().where(User.id == user_id)
await database.execute(query)
return {"message": "User deleted successfully"}

测试异步数据库操作

为了确保我们实现的异步数据库操作有效,建议使用FastAPI的测试客户端进行测试。在下一篇教程中,我们将讨论如何编写单元测试,以验证我们创建的API是否按预期工作。

小结

在本篇教程中,我们讲解了如何在FastAPI中实现异步数据库操作。通过使用databases库,我们能够高效地管理数据库连接并执行异步的CRUD操作。接下来,我们将进入测试阶段,确保我们的API在生产环境中表现良好,这将是我们系列教程的一部分。

后续我们将探讨如何编写单元测试,以确保我们的代码的稳健性。希望这篇教程对您理解FastAPI的异步数据库操作有所帮助!

30 使用数据库的异步数据库操作

https://zglg.work/python-fastapi-zero/30/

作者

IT教程网(郭震)

发布于

2024-08-17

更新于

2024-08-18

许可协议

分享转发

交流

更多教程加公众号

更多教程加公众号

加入星球获取PDF

加入星球获取PDF

打卡评论