0前言
前情提要:使用Redis发布 token 以及lua脚本来共同完成消息的幂等
目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期)】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】
1.我开源项目IM重复的原因
- IM系统中有三个常见的指标。消息可靠(不丢消息)。就是消息不能重复(不重复)。保证消息的时序性(不乱序)。这三个指标非常重要。
- 消息可靠主要通过报文协议等操作来完成。前面视频有一期讲过报文协议。目前主要采取上述方式去保证消息的可靠性。然后再保证消息可靠性的过程中,有一些需要重试的操作。可能会导致数据库多次插入。需要我们来保证一下消息的幂等。通俗的讲就是保证消息的不重复。
1.客户端会重复的发送消息
客户端是一个timer的机制。客户端a发送给b消息的时候,在0.5秒没有收到b的ack的时候会重发消息,重发三次还没有收到ack视为重发失败
//使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数
伪代码如下。用户在线并且不是重试消息的时候,添加到队列里面。
进行重试的js代码
2.mq处理逻辑异常重试机制
mq处理逻辑的时候抛出异常然后重试
参考上述逻辑图,消息落库的时候异步分发到了mq上面。rocketmq有超时重试机制,会自动重试。导致消息被多次消费。需要做幂等
2.如何解决的幂等
为什么要解决幂等,什么情况下出现幂等
对同一个资源进行操作副作用只有一次。
-
insert操作,这种情况下多次请求,可能会产生重复数据。
-
update操作,如果只是单纯的更新数据,比如:update user set status=1 where id=1,是没有问题的。如果还有计算,比如:update user set status=status+1 where id=1,这种情况下多次请求,可能会导致数据错误。
使用redis做的幂等。redis做幂等其实有两种思路。
当客户端请求分布式id的时候将其存入redis。也就是获取一个唯一id。当进行消费消息的时候。先判断唯一id在不在。在的话删除redis中的唯一id并且进行业务操作。不再的话就不能进行业务操作来实现的幂等。
流程代码如下所示:
1.获取token以及存储token到redis中;
在loginUser 用户中心服务中
我们使用了美团的分布式id来生成分布式id。
2.前台发送消息的时候携带上唯一id
这个是发送消息的操作
const no = await getLeaf();这行代码请求后端接口。然后构造消息体。
3.聊天服务(Netty)收到前台消息后 mq异步发送消息
发送给保存的主题
4.业务模块(frist)消费消息
lua表达式
目前使用redis的类型是set,键是yan_leaf