测试平台系列(79) 编写Redis配置功能(下)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 编写Redis配置功能(下)

大家好~我是米洛


我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程,希望大家多多支持。


欢迎关注我的龚仲耗测试开发坑货,获取最新文章教程!

回顾


上一节我们提出了优化Dao逻辑的想法,那今天就试着来兑现之,并运用到Redis配置管理的开发中去。

初步构思list方法


我们在dao/init.py新建类: Mapper,以后所有的dao类都继承自它。

想想list需要什么,一般需要,字段参数是like还是等于这3个重要的信息。

明白这个以后,我们的伪代码就好编写了:


# 1. 获取session
async with async_session() as session:
  condition = [model.deleted_at == 0]
  # 2. 根据参数里的字段,字段值构造查询条件
  DatabaseHelper.where(字段,字段值,condition)
  # 3. 查询出结果,后续与原来的方式一致,就不写了

可以看到,这边除了需要上面的3个信息以外,还需要try去写入日志,也就是说需要我们平时经常创建的log成员变量,还需要model这个类,否则你不知道改的是什么表。

思考


我们平时的参数,都是key-value的形式,一旦value不为空,那么说明我们要根据这个条件来查询。

而model和log,我们可以通过类的装饰器,由子类传递给父类(这里会用到setattr和getattr)


class Mapper(object):
    log = None
    model = None
    @classmethod
    async def list_data(cls, **kwargs):
        try:
            async with async_session() as session:
                # 构造查询条件,默认是数据未被删除
                condition = [getattr(cls.model, "deleted_at") == 0]
                # 遍历参数,当参数不为None的时候传递
                for k, v in kwargs.items():
                    # 判断是否是like的情况 TODO: 这里没支持in查询
                    like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                    # 如果是like模式,则使用Model.字段.like 否则用 Model.字段 等于
                    DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                         condition)
                result = await session.execute(select(cls.model).where(*condition))
                return result.scalars().all()
        except Exception as e:
            # 这边调用cls本身的log参数,写入日志+抛出异常
            cls.log.error(f"获取{cls.model}列表失败, error: {e}")
            raise Exception(f"获取数据失败")

注释写的比较详细,由于现在字段是个变量,所以我们不能用model.字段来取值,所以取而代之的是getattr(model, 字段)。不熟悉的朋友可以去搜索下getattr

这样,一个粗略的list方法就写好了,但是这个是不带分页的,所以我们还需要补充一个分页的模式,其实也很简单。


@classmethod
    async def list_data_with_pagination(cls, page, size, **kwargs):
        try:
            async with async_session() as session:
                condition = [getattr(cls.model, "deleted_at") == 0]
                for k, v in kwargs.items():
                    like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
                    sql = DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                               condition)
                return await DatabaseHelper.pagination(page, size, session, sql)
        except Exception as e:
            cls.log.error(f"获取{cls.model}列表失败, error: {e}")
            raise Exception(f"获取数据失败")

基本上长的差不多,只是最后返回那里,调用了pagination相关方法。

看看dao装饰器


5.jpg

这也是为什么要用classmethod而不是staticmethod的原因

我们很坏,把这个装饰器套到子类上,并把model和log传给父类。毕竟方法都是在调用父类的方法,父类无法直接拿到子类的数据

6.jpg

用的话,就是把model和log传入dao参数

这样就避免了在调用list方法的时候,还需要传入model和log的尴尬情况。

完善其他方法


list搞定以后,其他的还会远吗?但还真的好像还有点问题,因为我们一般会有一些重名判断,但没关系,我们可以编写一个query方法。


@classmethod
    def query_wrapper(cls, **kwargs):
        condition = [getattr(cls.model, "deleted_at") == 0]
        # 遍历参数,当参数不为None的时候传递
        for k, v in kwargs.items():
            # 判断是否是like的情况 TODO: 这里没支持in查询
            like = v is not None and len(v) > 2 and v.startswith("%") and v.endswith("%")
            # 如果是like模式,则使用Model.字段.like 否则用 Model.字段 等于
            DatabaseHelper.where(v, getattr(cls.model, k).like(v) if like else getattr(cls.model, k) == v,
                                 condition)
        return select(cls.model).where(*condition)
    @classmethod
    async def query_record(cls, **kwargs):
        try:
            async with async_session() as session:
                sql = cls.query_wrapper(**kwargs)
                result = await session.execute(sql)
                return result.scalars().first()
        except Exception as e:
            cls.log.error(f"查询{cls.model}失败, error: {e}")
            raise Exception(f"查询数据失败")

由于查询方法太过于通用,所以抽成了query_wrapper方法。

  • 正常编写insert接口


@classmethod
    async def insert_record(cls, model):
        try:
            async with async_session() as session:
                async with session.begin():
                    session.add(model)
                    await session.flush()
                    session.expunge(model)
                    return model
        except Exception as e:
            cls.log.error(f"添加{cls.model}记录失败, error: {e}")
            raise Exception(f"添加记录失败")

这边返回了model,如果不需要也可以不用,但咱还是给返回。

  • insert接口层

7.jpg

image

和以前一样,先查,如果没有再关掉。

但这样会产生2个session,开->关->开->关

但也解耦了查询和插入2个操作。

  • 编写删除和修改方法


@classmethod
    async def update_record_by_id(cls, user, model, not_null=False):
        try:
            async with async_session() as session:
                async with session.begin():
                    query = cls.query_wrapper(id=model.id)
                    result = await session.execute(query)
                    original = result.scalars().first()
                    if original is None:
                        raise Exception("记录不存在")
                    DatabaseHelper.update_model(original, model, user, not_null)
                    await session.flush()
                    session.expunge(original)
                    return original
        except Exception as e:
            cls.log.error(f"更新{cls.model}记录失败, error: {e}")
            raise Exception(f"更新记录失败")
    @classmethod
    async def delete_record_by_id(cls, user, id):
        """
        逻辑删除
        :param user:
        :param id:
        :return:
        """
        try:
            async with async_session() as session:
                async with session.begin():
                    query = cls.query_wrapper(id=id)
                    result = await session.execute(query)
                    original = result.scalars().first()
                    if original is None:
                        raise Exception("记录不存在")
                    DatabaseHelper.delete_model(original, user)
        except Exception as e:
            cls.log.error(f"删除{cls.model}记录失败, error: {e}")
            raise Exception(f"删除记录失败")
    @classmethod
    async def delete_by_id(cls, id):
        """
        物理删除
        :param id:
        :return:
        """
        try:
            async with async_session() as session:
                async with session.begin():
                    query = cls.query_wrapper(id=id)
                    result = await session.execute(query)
                    original = result.scalars().first()
                    if original is None:
                        raise Exception("记录不存在")
                    session.delete(original)
        except Exception as e:
            cls.log.error(f"逻辑删除{cls.model}记录失败, error: {e}")
            raise Exception(f"删除记录失败")

删除这边支持了物理删除和逻辑删除,当然我们一般是用软删除

完善其他接口


8.jpg

image

可以看到删除和修改和以前是差不多的(也是内部进行数据是否存在判断),这些步骤做完。redis的管理工作就可以顺利进行了,接着我们需要为之编写页面咯!

由于是很基础的表格页面,所以我们不赘述。下一节我们会利用配置的redis连接编写redisManager,管理我们的连接数据,为之后在线执行redis以及将它作为前置条件打下基础。



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1天前
|
消息中间件 测试技术 Linux
linux实时操作系统xenomai x86平台基准测试(benchmark)
本文是关于Xenomai实时操作系统的基准测试,旨在评估其在低端x86平台上的性能。测试模仿了VxWorks的方法,关注CPU结构、指令集等因素对系统服务耗时的影响。测试项目包括信号量、互斥量、消息队列、任务切换等,通过比较操作前后的时戳来测量耗时,并排除中断和上下文切换的干扰。测试结果显示了各项操作的最小、平均和最大耗时,为程序优化提供参考。注意,所有数据基于特定硬件环境,测试用例使用Alchemy API编写。
8 0
linux实时操作系统xenomai x86平台基准测试(benchmark)
|
1天前
|
传感器 Linux 测试技术
xenomai 在X86平台下中断响应时间测试
该文讨论了实时操作系统中断响应时间的重要性,并介绍了x86中断机制和Xenomai的中断管理,包括硬件中断和虚拟中断的处理。Xenomai通过I-Pipe确保实时性,中断优先级高的Xenomai先处理中断。文中还提到了中断响应时间的测试设计,分别针对I-Pipe内核间虚拟中断和硬件中断进行了测试,并给出了在不同负载下的测试结果。
7 0
xenomai 在X86平台下中断响应时间测试
|
9天前
|
Linux 测试技术 数据安全/隐私保护
CentOS安装MeterSphere并实现无公网IP远程访问本地测试平台
CentOS安装MeterSphere并实现无公网IP远程访问本地测试平台
|
11天前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
11天前
|
DataWorks NoSQL 关系型数据库
DataWorks操作报错合集之在使用 DataWorks 进行 MongoDB 同步时遇到了连通性测试失败,实例配置和 MongoDB 白名单配置均正确,且同 VPC 下 MySQL 可以成功连接并同步,但 MongoDB 却无法完成同样的操作如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
29 1
|
15天前
|
存储 监控 NoSQL
|
15天前
|
存储 NoSQL API
Redis入门到通关之GEO实现附近的人功能
Redis入门到通关之GEO实现附近的人功能
|
15天前
|
存储 NoSQL Redis
Redis入门到通关之Set实现点赞功能
Redis入门到通关之Set实现点赞功能
15 0
|
16天前
|
JavaScript API
【vue】分环境构建(开发/测试/生产)配置
【vue】分环境构建(开发/测试/生产)配置
18 1
|
17天前
|
人工智能 监控 数据处理
【AI大模型应用开发】【LangSmith: 生产级AI应用维护平台】1. 快速上手数据集与测试评估过程
【AI大模型应用开发】【LangSmith: 生产级AI应用维护平台】1. 快速上手数据集与测试评估过程
38 0