前景回顾
【mq】从零开始实现 mq-01-生产者、消费者启动
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-04-启动检测与实现优化
【mq】从零开始实现 mq-05-实现优雅停机
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载均衡 load balance
【mq】从零开始实现 mq-08-配置优化 fluent
【mq】从零开始实现 mq-09-消费者拉取消息 pull message
【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack
状态回执
大家好,我是老马。
上一节我们只实现了拉取消息的实现,但是缺少了消费状态回执。
这一节我们一起来学习下如何实现状态回执。
代码实现
回执状态的设计
我们规定如下几种回执状态:
package com.github.houbb.mq.common.constant;/** * @author binbin.hou * @since 0.0.3 */public final class MessageStatusConst { private MessageStatusConst(){} /** * 待消费 * ps: 生产者推送到 broker 的初始化状态 */ public static final String WAIT_CONSUMER = "W"; /** * 推送给消费端处理中 * ps: broker 准备推送时,首先将状态更新为 P,等待推送结果 * @since 0.1.0 */ public static final String TO_CONSUMER_PROCESS = "TCP"; /** * 推送给消费端成功 * @since 0.1.0 */ public static final String TO_CONSUMER_SUCCESS = "TCS"; /** * 推送给消费端失败 * @since 0.1.0 */ public static final String TO_CONSUMER_FAILED = "TCF"; /** * 消费完成 */ public static final String CONSUMER_SUCCESS = "CS"; /** * 消费失败 */ public static final String CONSUMER_FAILED = "CF"; /** * 稍后消费 * @since 0.1.0 */ public static final String CONSUMER_LATER = "CL";}
消费者状态回执
我们在消费之后,添加状态回执:
for(MqMessage mqMessage : mqMessageList) { IMqConsumerListenerContext context = new MqConsumerListenerContext(); final String messageId = mqMessage.getTraceId(); ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context); log.info("消息:{} 消费结果 {}", messageId, consumerStatus); // 状态同步更新 MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus); log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp));}
回执实现,根据 messageId 更新对应的消息消费状态。
public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) { final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq(); req.setMessageId(messageId); req.setMessageStatus(consumerStatus.getCode()); final String traceId = IdHelper.uuid32(); req.setTraceId(traceId); req.setMethodType(MethodType.C_CONSUMER_STATUS); // 重试 return Retryer.<MqCommonResp>newInstance() .maxAttempt(consumerStatusMaxAttempt) .callable(new Callable<MqCommonResp>() { @Override public MqCommonResp call() throws Exception { Channel channel = getChannel(null); MqCommonResp resp = callServer(channel, req, MqCommonResp.class); if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED); } return resp; } }).retryCall();}
Broker 回执处理
消息分发
// 消费者消费状态 ACKif(MethodType.C_CONSUMER_STATUS.equals(methodType)) { MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class); final String messageId = req.getMessageId(); final String messageStatus = req.getMessageStatus(); return mqBrokerPersist.updateStatus(messageId, messageStatus);}
简单实现
这里是基于本地 map 更新状态的,性能比较差。
后续会以 mysql 实现。
public MqCommonResp updateStatus(String messageId, String status) { // 这里性能比较差,所以不可以用于生产。仅作为测试验证 for(List<MqMessagePersistPut> list : map.values()) { for(MqMessagePersistPut put : list) { MqMessage mqMessage = put.getMqMessage(); if(mqMessage.getTraceId().equals(messageId)) { put.setMessageStatus(status); break; } } } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp;}
小结
对于消息状态的细化,更加便于我们后续的管理,和问题的定位。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq
拓展阅读
rpc-从零开始实现 rpc https://github.com/houbb/rpc
原文:https://juejin.cn/post/7096854658995453982