Netty+SpringBoot+FastDFS+Html5实现聊天App,。
Netty+SpringBoot+FastDFS+Html5实现聊天App,。
。
本章主要讲的是聊天App_PigChat中关于聊天功能的实现。
移除方法与处理异常方法的重写
在ChatHandler中重写其移除channel的方法handlerRemoved,以及处理异常的方法exceptionCaught。
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String channelId = ctx.channel().id().asShortText(); System.out.println("客户端被移除,channelId为:" + channelId); // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel users.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除 ctx.channel().close(); users.remove(ctx.channel()); }
定义消息的实体类
public class ChatMsg implements Serializable { private static final long serialVersionUID = 3611169682695799175L; private String senderId; // 发送者的用户id private String receiverId; // 接受者的用户id private String msg; // 聊天内容 private String msgId; // 用于消息的签收 public String getSenderId() { return senderId; } public void setSenderId(String senderId) { this.senderId = senderId; } public String getReceiverId() { return receiverId; } public void setReceiverId(String receiverId) { this.receiverId = receiverId; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } }
对实体类再做一层包装
public class DataContent implements Serializable { private static final long serialVersionUID = 8021381444738260454L; private Integer action; // 动作类型 private ChatMsg chatMsg; // 用户的聊天内容entity private String extand; // 扩展字段 public Integer getAction() { return action; } public void setAction(Integer action) { this.action = action; } public ChatMsg getChatMsg() { return chatMsg; } public void setChatMsg(ChatMsg chatMsg) { this.chatMsg = chatMsg; } public String getExtand() { return extand; } public void setExtand(String extand) { this.extand = extand; }}
定义发送消息的动作的枚举类型
public enum MsgActionEnum { CONNECT(1, "第一次(或重连)初始化连接"), CHAT(2, "聊天消息"), SIGNED(3, "消息签收"), KEEPALIVE(4, "客户端保持心跳"), PULL_FRIEND(5, "拉取好友"); public final Integer type; public final String content; MsgActionEnum(Integer type, String content){ this.type = type; this.content = content; } public Integer getType() { return type; } }
定义记录用户与channel关系的类
/** * @Description: 用户id和channel的关联关系处理 */public class UserChannelRel { private static HashMapmanager = new HashMap<>(); public static void put(String senderId, Channel channel) { manager.put(senderId, channel); } public static Channel get(String senderId) { return manager.get(senderId); } public static void output() { for (HashMap.Entry entry : manager.entrySet()) { System.out.println("UserId: " + entry.getKey() + ", ChannelId: " + entry.getValue().id().asLongText()); } }}
接受与处理消息方法的重写
重写ChatHandler读取消息的channelRead0方法。
具体步骤如下:
(1)获取客户端发来的消息;
(2)判断消息类型,根据不同的类型来处理不同的业务;
(2.1)当websocket 第一次open的时候,初始化channel,把用的channel和userid关联起来;
(2.2)聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收];
然后实现消息的发送,首先从全局用户Channel关系中获取接受方的channel,然后当receiverChannel不为空的时候,从ChannelGroup去查找对应的channel是否存在,若用户在线,则使用writeAndFlush方法向其发送消息;(2.3)签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收];
(2.4)心跳类型的消息
// 用于记录和管理所有客户端的channle public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("read.........."); // 获取客户端传输过来的消息 String content = msg.text(); Channel currentChannel = ctx.channel(); // 1. 获取客户端发来的消息 DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class); Integer action = dataContent.getAction(); // 2. 判断消息类型,根据不同的类型来处理不同的业务 if (action == MsgActionEnum.CONNECT.type) { // 2.1 当websocket 第一次open的时候,初始化channel,把用的channel和userid关联起来 String senderId = dataContent.getChatMsg().getSenderId(); UserChannelRel.put(senderId, currentChannel); // 测试 for (Channel c : users) { System.out.println(c.id().asLongText()); } UserChannelRel.output(); } else if (action == MsgActionEnum.CHAT.type) { // 2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收] ChatMsg chatMsg = dataContent.getChatMsg(); String msgText = chatMsg.getMsg(); String receiverId = chatMsg.getReceiverId(); String senderId = chatMsg.getSenderId(); // 保存消息到数据库,并且标记为 未签收 UserService userService = (UserService)SpringUtil.getBean("userServiceImpl"); String msgId = userService.saveMsg(chatMsg); chatMsg.setMsgId(msgId); DataContent dataContentMsg = new DataContent(); dataContentMsg.setChatMsg(chatMsg); // 发送消息 // 从全局用户Channel关系中获取接受方的channel Channel receiverChannel = UserChannelRel.get(receiverId); if (receiverChannel == null) { // TODO channel为空代表用户离线,推送消息(JPush,个推,小米推送) } else { // 当receiverChannel不为空的时候,从ChannelGroup去查找对应的channel是否存在 Channel findChannel = users.find(receiverChannel.id()); if (findChannel != null) { // 用户在线 receiverChannel.writeAndFlush( new TextWebSocketFrame( JsonUtils.objectToJson(dataContentMsg))); } else { // 用户离线 TODO 推送消息 } } } else if (action == MsgActionEnum.SIGNED.type) { // 2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收] UserService userService = (UserService)SpringUtil.getBean("userServiceImpl"); // 扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号间隔 String msgIdsStr = dataContent.getExtand(); String msgIds[] = msgIdsStr.split(","); ListmsgIdList = new ArrayList<>(); for (String mid : msgIds) { if (StringUtils.isNotBlank(mid)) { msgIdList.add(mid); } } System.out.println(msgIdList.toString()); if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() > 0) { // 批量签收 userService.updateMsgSigned(msgIdList); } } else if (action == MsgActionEnum.KEEPALIVE.type) { // 2.4 心跳类型的消息 System.out.println("收到来自channel为[" + currentChannel + "]的心跳包..."); } }
获取未签收的消息列表的接口
在controller中添加获取未签收的消息列表的接口getUnReadMsgList。
/** * * @Description: 用户手机端获取未签收的消息列表 */ @PostMapping("/getUnReadMsgList") public IMoocJSONResult getUnReadMsgList(String acceptUserId) { // 0. userId 判断不能为空 if (StringUtils.isBlank(acceptUserId)) { return IMoocJSONResult.errorMsg(""); } // 查询列表 ListunreadMsgList = userService.getUnReadMsgList(acceptUserId); return IMoocJSONResult.ok(unreadMsgList); }
测试