使用Mysql实现消息队列

简介: 使用数据库实现简单队列,一个好处就是消息都是可见的。 可以直接查数据库。不用加悲观锁,因为update成功后就带锁。其它同版本的更新都会失败。还有个好处就是减少组件依赖。 简单的服务数据库就能搞定。 不用再起个rabbitmq服务啥的。 节约运维成本。

0654ce9563af4a629f4f25576b38391e.gif

消息队列应用场景广泛,使用SQL实现简易队列原理也很简单,实现起来就是 消息状态版本号 字段。

image.png

更新时用 版本号 做乐观锁。操作逻辑就是个状态机。

UPDATE mq SET mq.status=new_status mq.version = mq.version + 1 WHERE mq.version = old_version

实现

mysql mq 表结构设计

CREATE TABLE `mq` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `msg` varchar(1024) DEFAULT NULL,
  `status` varchar(100) DEFAULT 'ready',
  `version` bigint(20) unsigned DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

生产者

测试,向队列插入两条消息。

insert into mq(msg) values('第一条消息 hello world! 1024');
insert into mq(msg) values('第二条消息 欢迎访问 https://spaceack.com');

image.png

消费者

获取队列中的消息, 此时不会改变队列(mq表)中的数据。

select * from mq where status='ready' limit 1;

image.png

消息确认(关键!)

update mq set mq.status = 'ack', mq.version = mq.version + 1 WHERE mq.version = {query_version} and id = {query_id}

确认后的状态:

image.png

再次获取数据仅能获取第二条数据。

image.png

这样的一个好处就是消息都是可见的。可以直接查数据库。

不用加悲观锁,因为update成功后就带锁。其它同版本的更新都会失败。

还有个好处就是减少组件依赖。简单的服务数据库就能搞定。不用再起个rabbitmq等队列服务。节约运维成本。

版本号 另一个小作用:InnoDB 如果更新语句没有改变任何字段值时,影响行数会返回0,那么是没找到记录还是没改变值的0呢?前者是bug, 后者是正常情况但区分不了。加版本号后每次修改+1,影响行数一定不为0。

隐藏福利:生产者自动批量测试脚本

import random
import subprocess

def runcmd(command):
    ret = subprocess.run(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,encoding="utf-8",timeout=1)
    if ret.returncode == 0:
        # print("success:",ret, ret.stdout)
        return ret.stdout
    else:
        print("error:",ret)


def producer():
    sql = "\"insert into mq(msg) values('%s');\"" % (str(random.randint(1,99999)))
    cmd = """mysql -uroot -ppassword test -e %s """ % (sql)
    runcmd(cmd)
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
Prometheus 监控 Cloud Native
手把手教你Prometheus + Granafa实现mysql 性能监测部署
数据库性能监控可以说是十分重要,能否自行搭建环境实现像阿里云或是腾讯云那样直观的展示不同维度数据的功能?答案是肯定的。下面详细说明一下安装部署过程以及过程中出现的问题,希望对你有所帮助!
手把手教你Prometheus + Granafa实现mysql 性能监测部署
|
9月前
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
452 0
|
9月前
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
590 0
|
9月前
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
639 0
|
SQL 关系型数据库 MySQL
MySql字符串拆分实现split功能(字段分割转列、转行)
MySql字符串拆分实现split功能(字段分割转列、转行)
MySql字符串拆分实现split功能(字段分割转列、转行)
|
SQL 缓存 关系型数据库
MySQL日志(undo log 和 redo log 实现事务的原子性/持久性/一致性)
MySQL日志(undo log 和 redo log 实现事务的原子性/持久性/一致性)
MySQL日志(undo log 和 redo log 实现事务的原子性/持久性/一致性)
|
SQL 关系型数据库 MySQL
C语言连接并实现对MySQL的增删改查
C语言访问数据库并不如Java、Python那般容易。本文介绍C语言连接并实现对MySQL的增删改查的方法。
436 0
|
canal 关系型数据库 MySQL
基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输
基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输
基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输
|
SQL 前端开发 关系型数据库
mysql实现一次将多条不同sql查询结果并封装到一个结果集
最近遇到一个统计查询需求,要求一次性查询多个统计信息,其中两个查询信息不在一个表中,也没有业务关联,表中也没有做连接处理。不考虑产品设计是否合理,完全是实际需求如此,需要一次性查询出来返回给前端进行展示,对于这种“非常规”的统计查询平常肯定会遇见,感觉有点代表性,所以简单记录一下。希望对有相同需求的同学可以作为参考。
mysql实现一次将多条不同sql查询结果并封装到一个结果集
|
SQL 关系型数据库 MySQL
mysql实现统计同一字段不同状态的总数量之case when
直接进入主题,简单说下case when的语法.