09-09-netty发送系统消息
This commit is contained in:
@ -80,6 +80,7 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 处理用户认证事件
|
// 处理用户认证事件
|
||||||
|
@Transactional
|
||||||
@Override
|
@Override
|
||||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
|
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
|
||||||
@ -117,46 +118,49 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
|
|||||||
channelGroup.add(channel);
|
channelGroup.add(channel);
|
||||||
|
|
||||||
//构建推送消息
|
//构建推送消息
|
||||||
List<Long> userIds = new ArrayList<>();
|
// List<Long> userIds = new ArrayList<>();
|
||||||
//类型转换
|
// //类型转换
|
||||||
for (String s : onlineUserList) {
|
// for (String s : onlineUserList) {
|
||||||
userIds.add(Long.parseLong(s));
|
// userIds.add(Long.parseLong(s));
|
||||||
}
|
// }
|
||||||
// List<UserDTO> userDTOS = sysUserService.selectListByIds(userIds);
|
// List<UserDTO> userDTOS = sysUserService.selectListByIds(userIds);
|
||||||
|
|
||||||
//构建各个聊天房间未读 数量
|
//构建各个聊天房间未读 数量
|
||||||
LambdaQueryWrapper<ChatGroup> queryWrapper = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<ChatGroup> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
queryWrapper.like(ChatGroup::getMembers, loginUser.getUserId()+",").or().like(ChatGroup::getMembers,loginUser.getUserId()+"]");
|
queryWrapper.like(ChatGroup::getMembers, loginUser.getUserId()+",").or().like(ChatGroup::getMembers,loginUser.getUserId()+"]");
|
||||||
//拿到该用户的房间列表
|
//拿到该用户所参与的房间列表
|
||||||
List<ChatGroup> chatGroups = chatGroupService.list(queryWrapper);
|
List<ChatGroup> chatGroups = chatGroupService.list(queryWrapper);
|
||||||
|
boolean isHaveSystemRoom = false;
|
||||||
if (chatGroups != null && !chatGroups.isEmpty()) {
|
if (chatGroups != null && !chatGroups.isEmpty()) {
|
||||||
// List<HashMap<String, Object>> roomCounts = new ArrayList<>();
|
|
||||||
HashMap<String, Object> roomCounts = new HashMap<>();
|
HashMap<String, Object> roomCounts = new HashMap<>();
|
||||||
for (ChatGroup chatGroup : chatGroups) {
|
for (ChatGroup chatGroup : chatGroups) {
|
||||||
LambdaQueryWrapper<ChatHistory> queryWrapper1 = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<ChatHistory> historyLambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||||
queryWrapper1.eq(ChatHistory::getGeterId, chatGroup.getId());
|
historyLambdaQueryWrapper.eq(ChatHistory::getGeterId, chatGroup.getId());
|
||||||
queryWrapper1.ne(ChatHistory::getSenderId, loginUser.getUserId());
|
historyLambdaQueryWrapper.ne(ChatHistory::getSenderId, loginUser.getUserId());
|
||||||
queryWrapper1.eq(ChatHistory::getIsRead, "1");
|
historyLambdaQueryWrapper.eq(ChatHistory::getIsRead, "1");
|
||||||
List<ChatHistory> list = chatHistoryService.list(queryWrapper1);
|
List<ChatHistory> list = chatHistoryService.list(historyLambdaQueryWrapper);
|
||||||
if (list != null && !list.isEmpty()) {
|
if (list != null && !list.isEmpty()) {
|
||||||
// HashMap<String, Object> map = new HashMap<>();
|
|
||||||
roomCounts.put(chatGroup.getId().toString(), list.size());
|
roomCounts.put(chatGroup.getId().toString(), list.size());
|
||||||
// roomCounts.add(map);
|
}
|
||||||
|
//在遍历的同时寻找是否有系统消息房间
|
||||||
|
if (chatGroup.getMembers().contains("system")){
|
||||||
|
isHaveSystemRoom = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
JSONObject message = new JSONObject();
|
JSONObject message = new JSONObject();
|
||||||
// message.put("type", "2");
|
|
||||||
// message.put("onLineUser", userDTOS);
|
|
||||||
message.put("type", "3");
|
message.put("type", "3");
|
||||||
message.put("unReadCount", roomCounts);
|
message.put("unReadCount", roomCounts);
|
||||||
log.info("发送所有未读消息:{}",message);
|
log.info("发送所有未读消息:{}",message);
|
||||||
// channelGroup.writeAndFlush(new TextWebSocketFrame(message.toJSONString()));
|
|
||||||
|
|
||||||
sendMessage(ctx, message.toJSONString());
|
sendMessage(ctx, message.toJSONString());
|
||||||
}
|
|
||||||
|
|
||||||
|
//认证完成后开始构建系统消息房间 判断是否有系统消息房间 没有则增加
|
||||||
|
if (!isHaveSystemRoom){
|
||||||
|
ChatGroup chatGroup = new ChatGroup();
|
||||||
|
chatGroup.setType(String.valueOf(0));
|
||||||
|
chatGroup.setMembers("[system, "+loginUser.getUserId()+"]");
|
||||||
|
chatGroupService.save(chatGroup);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
super.userEventTriggered(ctx, evt);
|
super.userEventTriggered(ctx, evt);
|
||||||
}
|
}
|
||||||
@ -249,16 +253,16 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
|
|||||||
//要从IDS中去掉自己ID防止发送自己消息
|
//要从IDS中去掉自己ID防止发送自己消息
|
||||||
ids.remove(Long.valueOf(channelUserMap.get(ctx)));
|
ids.remove(Long.valueOf(channelUserMap.get(ctx)));
|
||||||
for (Long id : ids) {
|
for (Long id : ids) {
|
||||||
|
//只要发送一条数据,就要给接收方推送所有未读消息
|
||||||
|
if (!userRoomCountMap.containsKey(id + "+" + RoomId)) {
|
||||||
|
userRoomCountMap.put(id + "+" + RoomId, 1);
|
||||||
|
}else {
|
||||||
|
userRoomCountMap.put(id + "+" + RoomId, userRoomCountMap.get(id + "+" + RoomId) + 1);
|
||||||
|
}
|
||||||
//通过每个用户ID拿到该用户所有通道实例
|
//通过每个用户ID拿到该用户所有通道实例
|
||||||
List<ChannelHandlerContext> channelHandlerContexts = userChannelMap.get(id.toString());
|
List<ChannelHandlerContext> channelHandlerContexts = userChannelMap.get(id.toString());
|
||||||
//如果满足则说明用户在线
|
//如果满足则说明用户在线
|
||||||
if (channelHandlerContexts != null && !channelHandlerContexts.isEmpty()) {
|
if (channelHandlerContexts != null && !channelHandlerContexts.isEmpty()) {
|
||||||
//只要发送一条数据,就要给接收方推送所有未读消息
|
|
||||||
if (!userRoomCountMap.containsKey(id + "+" + RoomId)) {
|
|
||||||
userRoomCountMap.put(id + "+" + RoomId, 1);
|
|
||||||
}else {
|
|
||||||
userRoomCountMap.put(id + "+" + RoomId, userRoomCountMap.get(id + "+" + RoomId) + 1);
|
|
||||||
}
|
|
||||||
//所有房间的未读消息数
|
//所有房间的未读消息数
|
||||||
jsonObject.put("countValue", JsonUtils.toJsonString(userRoomCountMap));
|
jsonObject.put("countValue", JsonUtils.toJsonString(userRoomCountMap));
|
||||||
//给每个通道发送对应消息
|
//给每个通道发送对应消息
|
||||||
@ -285,12 +289,12 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
|
|||||||
}
|
}
|
||||||
}else if ("1".equals(type)){
|
}else if ("1".equals(type)){
|
||||||
log.info("收到客户端确认消息:{}", jsonObject);
|
log.info("收到客户端确认消息:{}", jsonObject);
|
||||||
//将数据库中该消息的已读状态改为已读
|
//前端接收方收到消息返回1 说明此时处于聊天框内 可以清空该房间的所有未读消息 将此房间聊天记录全部已读
|
||||||
LambdaQueryWrapper<ChatHistory> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<ChatHistory> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||||
lambdaQueryWrapper
|
lambdaQueryWrapper
|
||||||
.eq(ChatHistory::getSenderId,jsonObject.get("from"))
|
.eq(ChatHistory::getSenderId,jsonObject.get("from"))
|
||||||
.eq(ChatHistory::getGeterId,jsonObject.get("roomId"))
|
.eq(ChatHistory::getGeterId,jsonObject.get("roomId"))
|
||||||
.eq(ChatHistory::getMessage,jsonObject.get("message"))
|
// .eq(ChatHistory::getMessage,jsonObject.get("message"))
|
||||||
.eq(ChatHistory::getIsRead,"1");
|
.eq(ChatHistory::getIsRead,"1");
|
||||||
List<ChatHistory> list = chatHistoryService.list(lambdaQueryWrapper);
|
List<ChatHistory> list = chatHistoryService.list(lambdaQueryWrapper);
|
||||||
if (list != null && !list.isEmpty()){
|
if (list != null && !list.isEmpty()){
|
||||||
@ -300,13 +304,19 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
|
|||||||
}
|
}
|
||||||
chatHistoryService.updateBatchById( list);
|
chatHistoryService.updateBatchById( list);
|
||||||
|
|
||||||
//将未读消息数减一
|
//将该房间未读消息 清空
|
||||||
if(userRoomCountMap.get(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId")) > 0) {
|
userRoomCountMap.put(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId"), 0);
|
||||||
userRoomCountMap.put(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId"), userRoomCountMap.get(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId")) - 1);
|
// if(userRoomCountMap.get(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId")) != null && userRoomCountMap.get(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId")) > 0) {
|
||||||
}else {
|
// userRoomCountMap.put(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId"), userRoomCountMap.get(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId")) - 1);
|
||||||
userRoomCountMap.put(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId"), 0);
|
// }else {
|
||||||
}
|
// userRoomCountMap.put(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId"), 0);
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
// else if ("2".equals(type)){
|
||||||
|
// //前端点击某个聊天房间进入时 仅需发送type2 清空该房间的未读消息数
|
||||||
|
// userRoomCountMap.put(channelUserMap.get(ctx) + "+" + jsonObject.get("roomId"), 0);
|
||||||
|
// //将该房间对应消息都置为已读
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理异常
|
// 处理异常
|
||||||
@ -330,7 +340,52 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
|
|||||||
|
|
||||||
|
|
||||||
//通过userId进行发送消息
|
//通过userId进行发送消息
|
||||||
private void sendMessageByUserId(Long userId,String message){
|
@Transactional
|
||||||
|
public void sendSystemMessageToUser(Long userId, String message){
|
||||||
|
// /现在已用type:0 1 3 4用于服务端发送消息到系统消息房间
|
||||||
|
// {
|
||||||
|
// "type":"4",
|
||||||
|
// "message": "testing"
|
||||||
|
// }
|
||||||
|
JSONObject jsonObject = new JSONObject();
|
||||||
|
jsonObject.put("type", "4");
|
||||||
|
jsonObject.put("message", message);
|
||||||
|
//通过userId拿到该用户所有通道实例
|
||||||
|
List<ChannelHandlerContext> channelHandlerContexts = userChannelMap.get(userId.toString());
|
||||||
|
if (channelHandlerContexts != null && !channelHandlerContexts.isEmpty()) {
|
||||||
|
//给每个通道发送对应消息
|
||||||
|
for (ChannelHandlerContext handlerContext : channelHandlerContexts) {
|
||||||
|
sendMessage(handlerContext, jsonObject.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//生成系统消息的聊天记录
|
||||||
|
LambdaQueryWrapper<ChatGroup> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||||
|
lambdaQueryWrapper.eq(ChatGroup::getMembers,"[system, "+userId+"]");
|
||||||
|
ChatGroup groupServiceOne = chatGroupService.getOne(lambdaQueryWrapper);
|
||||||
|
//如果给成员发送系统消息 而该成员还没有建立过连接从而没有群聊房间 则创建该成员系统消息房间
|
||||||
|
if (groupServiceOne == null){
|
||||||
|
groupServiceOne = new ChatGroup();
|
||||||
|
groupServiceOne.setType(String.valueOf(0));
|
||||||
|
groupServiceOne.setMembers("[system, "+userId+"]");
|
||||||
|
chatGroupService.save(groupServiceOne);
|
||||||
|
}
|
||||||
|
ChatHistory chatHistory = new ChatHistory();
|
||||||
|
//发送方设置为0 表示系统消息
|
||||||
|
chatHistory.setSenderId(0L);
|
||||||
|
chatHistory.setGeterId(groupServiceOne.getId());
|
||||||
|
chatHistory.setMessage(message);
|
||||||
|
chatHistory.setMessageDate(new Date());
|
||||||
|
chatHistory.setIsRead("1");//未读
|
||||||
|
chatHistoryService.save(chatHistory);
|
||||||
|
|
||||||
|
//发送消息后 将该房间未读消息数加1
|
||||||
|
if (userRoomCountMap.containsKey(userId+"+"+groupServiceOne.getId())){
|
||||||
|
//该房间未读消息数加1
|
||||||
|
userRoomCountMap.put(userId+"+"+groupServiceOne.getId(), userRoomCountMap.get(userId+"+"+groupServiceOne.getId())+1);
|
||||||
|
}else{
|
||||||
|
//将该房间未读消息数设置为1
|
||||||
|
userRoomCountMap.put(userId+"+"+groupServiceOne.getId(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,13 +149,22 @@ public class ChatGroupController {
|
|||||||
List<Long> list = JSONObject.parseArray(members, Long.class);
|
List<Long> list = JSONObject.parseArray(members, Long.class);
|
||||||
list.remove(userId);
|
list.remove(userId);
|
||||||
//如果单聊 则集合只剩有一人
|
//如果单聊 则集合只剩有一人
|
||||||
SysUserVo sysUserVo = sysUserService.selectUserById(list.get(0));
|
SysUserVo sysUserVo = sysUserService.selectUserById(list.getFirst());
|
||||||
byId.setName(sysUserVo.getNickName());
|
if (sysUserVo != null) {
|
||||||
byId.setAvatar(sysUserVo.getAvatar());
|
byId.setName(sysUserVo.getNickName());
|
||||||
|
byId.setAvatar(sysUserVo.getAvatar());
|
||||||
|
}else{
|
||||||
|
byId.setName("此用户已注销");
|
||||||
|
byId.setAvatar(null);
|
||||||
|
}
|
||||||
}else {
|
}else {
|
||||||
//群聊 则只需要将群主头像赋值给群聊头像
|
//群聊 则只需要将群主头像赋值给群聊头像
|
||||||
SysUserVo sysUserVo = sysUserService.selectUserById(byId.getOwerId());
|
SysUserVo sysUserVo = sysUserService.selectUserById(byId.getOwerId());
|
||||||
byId.setAvatar(sysUserVo.getAvatar());
|
if (sysUserVo != null){
|
||||||
|
byId.setAvatar(sysUserVo.getAvatar());
|
||||||
|
}else {
|
||||||
|
byId.setAvatar(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user