如何在Fastapi完成Redis和mysql的缓存同步

1.FastAPI 简单介绍

FastAPI(Fast API)是一个用于构建现代、快速(高性能)的Web API的Python框架。它基于标准的Python类型提示,并支持异步编程。以下是关于FastAPI的一些简单介绍:

  1. 性能优越: FastAPI 基于 Starlette 和 Pydantic 构建,充分利用了 Python 3.7+ 中引入的异步(async/await)特性。这使得 FastAPI 在性能方面表现优越,特别是在处理大量并发请求时。
  2. 类型提示: FastAPI 使用标准的 Python 类型提示(type hints)来定义 API 的输入和输出数据。这不仅提供了更好的文档支持,还能在运行时执行数据验证,从而减少了错误。
  3. 自动文档生成: 借助于 Swagger UI 和 ReDoc,FastAPI 自动生成详细、交互式的API文档。这使得开发者能够更轻松地了解和测试API的各种端点。
  4. 依赖注入系统: FastAPI 提供了一个强大的依赖注入系统,可以用于处理诸如数据库连接、身份验证等通用逻辑。这使得代码更易于测试和组织。
  5. WebSocket 支持: FastAPI 支持 WebSocket,允许构建实时应用程序。
  6. 异步/同步兼容: FastAPI 可以同时处理异步和同步的请求。这使得你可以根据需要选择使用异步编程模型,以提高性能。
  7. 安全性: FastAPI 提供了一些内置的安全性特性,例如 OAuth2 和 API 密钥的支持,以确保 API 的安全性。
  8. 兼容性: FastAPI 兼容与其他现代框架(如 Django、Flask)和标准工具(如 SQLAlchemy、OpenAPI)。

以下是一个使用 FastAPI 创建简单 API 的示例:

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}

@app.get("/items/{item_id}")
def read_item(item_id: int, query_param: str = None):
    return {"item_id": item_id, "query_param": query_param}

2.如何在Fastapi中完成Redis和Mysql同步

同步操作介绍


在某些特定的功能模块中。我们需要经常对一些热点数据进行实时缓存Redis中。这样做可以减少后端数据库的压力。那么这里就会牵扯到另外以一个问题。那就是我们该如何避免redis和数据库中的消息不一致的情况。本站博客采用的就是将数据库中的热点数据内容进行redis缓存。当数据库内容更新后。redis便会更新数据库中的最新内容


redis实时更新会产生的问题


Redis的工作流程 1.前台发送请求,后台接口去查询。

2.先去查询Redis缓存里面有没有数据,如果有数据,就直接返回数据。

3.如果Redis缓存里面没有数据,就去查询数据库,在数据库中查到数据以后,保存到Redis缓存中,然后在返回前台数据。

4.如果查询数据库都查询不到数据,就直接结束。


如何解决Redis与数据库的数据同步问题?

1.第一种方案:先更新数据库,在删除Redis缓存。 可能会出现的问题:在高并发的情况下,当一个用户A修改数据之后,先去更新数据库,在用户A还没有删除Redis缓存的时候,用户B又进来查询数据,但是用户B查询到的是Redis缓存里面的旧数据,这样明显就会出错,虽然可以通过加锁的机制去解决,但是会大大降低查询性能。

2.第二种方案:先删除Redis缓存,在更新数据库 可能会出现的问题:用户A修改信息之后,先删除Redis里面的缓存,在用户A还没来得及更新数据库的数据的时候,用户B又进来查询数据,发现Redis缓存里面没有数据,然后去查询数据库,然后将查询结果保存到Redis缓存中,但是现在的问题就是用户A还没来得及修改数据库里面的数据,用户B就直接把数据库里面的未修改的数据保存到了Redis缓存中。这样也会造成数据的不同步。


Sqlalchemy异步操作简单介绍

SQLAlchemy作为一款通用的Python Orm工具.他有很多的数据库操作功能.下面的例子利用SQLAlchemy作为演示.

创建一个类用来复用redis链接

###将命中率高的数据同步到redis中
class BlogCache:
    def __init__(self):
        # 创建Redis连接
        self.redis_client = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db, password=db_password)

使用Sqlalchemy异步操作且包含事件监听。监听Mysql的内容变化

# Create event listener to update cache
@event.listens_for(Blog, 'after_insert')
@event.listens_for(Blog, 'after_update')
@event.listens_for(Blog, 'after_delete')
def update_cache(mapper, connection, target):
    redis_key = f"blog_{target.BlogId}"
    data = {
        "BlogId": target.BlogId,
        "title": target.title,
        "content": target.content,
        "author": target.author,
        "BlogIntroductionPicture": target.BlogIntroductionPicture,
        "created_at": target.created_at,
    }
    blog_cache.redis_client.set(redis_key, pickle.dumps([data]))
    blog_cache.redis_client.expire(redis_key, 3600)  # Set expiration time to 1 hour


### 数据库缓存读取判断
@BlogApp.post("/user/Blogid")
async def Blogid(blog_id: int):
    async with db_session() as session:
        redis_key = f"blog_{blog_id}"
        cached_data = blog_cache.redis_client.get(redis_key)
        if cached_data:
            print('从缓存中读取')
            cached_data_obj = pickle.loads(cached_data)
            return cached_data_obj
        else:
            print('从数据库中读取')
            results = await session.execute(select(Blog).filter(Blog.BlogId == blog_id))
            data = results.scalars().all()
            data = [item.to_dict() for item in data]
            blog_cache.redis_client.set(redis_key, pickle.dumps(data))
            blog_cache.redis_client.expire(redis_key, 3600)
            return data

3.结语

在实际应用中,与数据库和缓存的集成是常见的需求。上述示例演示了如何使用 FastAPI、SQLAlchemy 和 Redis 实现一个简单的 API,并将数据同步到 Redis 缓存中,以提高访问速度。通过监听数据库的插入、更新和删除事件,可以保持缓存的同步性。

总体而言,FastAPI 提供了丰富的功能和性能优势,使得构建现代化的 Web API 变得更加简单和高效。

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容