标签:
这周项目要做一个在线聊天系统,感觉不是特别困难,原理也很简单,分享给大家。
技术:
Java(Spring)+Mysql+MemCache
Spring做的是事件驱动模型,所有DB,更新缓存操作改成异步的。
MemCache存放缓存,每个用户的聊天记录缓存,好友关系维护。
需求:
用户分为虚拟用户,普通用户,高级用户(在线经理人),管理员用户(客服)。
虚拟,普通用户有一个好友列表,好友列表保存着用户的好友,对于虚拟,普通用户来说,他们的好友列表只有高级用户+管理员用户。
高级用户,管理员用户来说只要是用户给我发过消息,我都能看到,并且回复。
后台提供的接口列表:
|--聊天列表
|--普通用户获取动态聊天列表,目前固定是三位,客服+经理2
|--特殊用户获取用户对自己提问的列表
|--聊天回复
|--直接发送消息到后台
|--获取聊天数据
|--获取该用户跟某用户的聊天记录,带分页
|--定时检查接口
|--检测此用户是否有新消息提示
?
提供接口控制器的源码:
@Controller public class CommunicateCtrl extends BaseController { @RequestMapping("/communicate/ask") @ResponseBody public void doAsk(@RequestAttr ResultData resultData, Communicate model, HttpServletRequest request) throws Exception { model.checkChatIdEmpty("聊天对象Id不能为空"); model.checkContentEmpty("聊天内容不能为空"); model.checkContentIllegal("您的聊天内容带有敏感词"); UserInfo userInfo = getUserInfo(); if (null != userInfo) { // 如果聊天者已经登录 model.setUserId(String.valueOf(userInfo.getUserId())); model.setMobile(userInfo.getMobile()); model.setName(StringUtil.isNullOrEmpty(userInfo.getUserName()) ? "" : userInfo.getUserName()); } else { model.setUserId(getUserId()); if (Str.isEmpty(model.getUserId())) // 当传过来的cookie为空,则生成一个cookie,并使用虚拟userId generateVirUserInfoWhenUserIdEmpty(model); } model.setStatus(1); // 未回复 model.setUserType(1); // 普通用户 model.setBuildTime(new Date()); communicateService.save(model); // 保存DB对象 resultData.setData(model); putEvent(model); } @RequestMapping("/communicate/friends") @ResponseBody public void doFriendList(@RequestAttr ResultData resultData, HttpServletRequest request) throws Exception { // 如果为普通用户 if (null == getUserInfo() || getUserInfo().getType() != 3) { List<UserInfo> userInfos = userInfoService.getList(" and type = 3 "); // 加载特殊角色,提供在线聊天功能 for (UserInfo userInfo : userInfos) userInfo.getDicMap().put("userType", 2); // userType 0 虚拟用户 1普通用户 2经纪人 resultData.setData(userInfos); return; } // 特殊用户获取好友列表 List<String> friendList = communicateHandle.getFriendListCache(getUserId()); // 获取好友列表 List<Object> list = new ArrayList<Object>(friendList.size()); for (String userId : friendList) { if (Str.isEmpty(userId)) continue; Object o = null; if (ZhengzeValidate.isInteger(userId)) { // 普通用户Id UserInfo userInfo = userInfoService.getById(Integer.parseInt(userId)); if (null != (o = userInfo)) { userInfo.setHeadImg(Str.isEmpty(userInfo.getHeadImg()) ? defaultImg : userInfo.getHeadImg()); userInfo.getDicMap().put("userType", 1); // userType 0 虚拟用户 1普通用户 2经纪人 } } else { // user.dicMap.userType // userType 0 虚拟用户 1普通用户 2经纪人 o = MapBean.getNew().set("userId", userId).set("headImg", defaultImg).set("dicMap", MapBean.getNew("userType", 0)); } list.add(o); } resultData.setData(list); } @RequestMapping("/communicate/check") @ResponseBody public void doCheck(@RequestAttr ResultData resultData, String updateStatusUserId) throws Exception { String userId = getUserId(); List<MapBean> dataMapList = new ArrayList<MapBean>(); // 用户是否有新消息列表 List<String> friendList = communicateHandle.getFriendListCache(userId); // 获取好友列表 List<Communicate> chatsList = null; List<Communicate> unReaderList = null; // 未读消息列表,提供给前端 // 循环所有好友的聊天数据,检测是否有新数据 for (String friendUserId : friendList) { // 110&8_chart_list // 8&110_chart_list chatsList = communicateHandle.getChatsCache(friendUserId, userId); // 取得与每个好友的聊天记录,注意与生成key的顺序区别 if (!chatsList.isEmpty()) {// 如果存在聊天数据 int size = 0; String lastMsg = null; if (Str.isNotEmpty(updateStatusUserId)) unReaderList = new ArrayList<Communicate>(); for (Communicate communicate : chatsList) { lastMsg = communicate.getContent(); if (!communicate.getUserId().equals(userId) && communicate.getStatus() == 1) { // 只查询我未读的消息,过滤我的消息 size += 1; if (communicate.getUserId().equals(updateStatusUserId)) { // 如果聊天对象一致,则更新状态,并返回未读消息列表 communicate.setStatus(2);// 内存与db一致 Communicate communicateDB = communicateService.getById(communicate.getId()); if (communicateDB.getStatus() == 2) // 如果其他线程已更新状态,这里则不返回 continue; communicateDB.setStatus(communicate.getStatus()); communicateService.updateById(communicateDB); unReaderList.add(communicate); } // // 如果需要更新状态 --- 性能更好的一种批量更新方式 // if (Str.isNotEmpty(isUpdateStatus)) { // communicateService.updateStatus(list.get(0), " and user_id = ‘" + userId + "‘ and status = " + Communicate.STATUS_MGR_REPLY); // } } } MapBean dataMap = MapBean.getNew("userId", friendUserId, "oper", "normal"); // 返回最后的页数,操作为正常(没有新消息) // 返回新消息 if (size > 0) { if (null != unReaderList && !unReaderList.isEmpty()) communicateHandle.updateChatsCache(updateStatusUserId, userId, chatsList); // 更新缓存 dataMap.set("oper", "new").set("msg", tl("你有:0条未读消息", size)); dataMap.set("lastMsg", lastMsg).set("unReadMsgCount", size); dataMap.set("unReaderList", unReaderList); } dataMapList.add(dataMap); } } resultData.setData(dataMapList); // 设置与所有用户聊天数据 // 如果出现某一个用户的聊天数据,则返回该用户的聊天数据 if (Str.isNotEmpty(updateStatusUserId)) { for (MapBean dataMap : dataMapList) { if (dataMap.getString("userId").equals(updateStatusUserId)) { resultData.setData(dataMap); break; } } } } @RequestMapping("/communicate/chats") @ResponseBody public void doChats(@RequestAttr ResultData resultData, String chatId) throws Exception { if (Str.isEmpty(chatId)) throw new RuntimeException("聊天对象Id不能为空"); int pageSize = Tool.convertInt(RequestTool.getParameter("pageSize"), 10); int msgId = Tool.convertInt(RequestTool.getParameter("msgId"), 0); List<Communicate> chatsList = communicateHandle.getChatsCache(chatId, getUserId()); // 取得与每个好友的聊天记录 int size = chatsList.size(); int lastIndex = size - 1; // List索引可能出现 1-1=0的情况,if中做兼容 if (lastIndex >= 0) { // 如果存在聊天数据 if (msgId <= 0) { // 如果消息Id为空,则取最后数据N条 if (size <= pageSize) resultData.setData(chatsList); else resultData.setData(chatsList.subList(size - pageSize, size)); // 倒序,取最后一节数据 return; } // 根据msgId来取数据 int msgIdIndex = binarySearch(chatsList, msgId); if (msgIdIndex == -1 || msgIdIndex == 0) // -1则表示此msgId不存在,0则表示在它之前已经没有了任何数据 return; int subIndex = msgIdIndex - pageSize; resultData.setData(chatsList.subList(subIndex < 0 ? 0 : subIndex, msgIdIndex)); // 取出比msgId小的Id // msgIdIndex += 1;// +1 过滤掉自己 // int subSize = msgIdIndex + pageSize; // resultData.setData(chatsList.subList(msgIdIndex, subSize > size ? size : subSize)); //取出比msgId大的Id } } // 二分法查找,查找线性表必须是有序列表 int binarySearch(List<Communicate> chatsList, int key) { int low = 0, high = chatsList.size() - 1, mid; while (low <= high) { mid = (low + high) >>> 1; if (key == chatsList.get(mid).getId()) { return mid; } else if (key < chatsList.get(mid).getId()) { high = mid - 1; } else { low = mid + 1; } } return -1; } void generateVirUserInfoWhenUserIdEmpty(Communicate communicate) { communicate.setUserId(UUID.randomUUID().toString().replace("-", "")); // 生成虚拟UUID HttpUtils.addCookie(RequestTool.getResponse(), Constants.VIR_USER_ID, communicate.getUserId(), 24 * 60 * 60 * 1000 * 7); // 保存cookie一周 } String getUserId() { String userId = null; if (null != getUserInfo()) { userId = getUserInfo().getUserId() + ""; } else { Cookie cookie = getCookieByName(Constants.VIR_USER_ID); if (null != cookie) userId = cookie.getValue(); } // debug模式可以传入用户id String id = RequestTool.getParameter("id"); return isDebug() && Str.isNotEmpty(id) ? id : userId; } @SuppressWarnings("unchecked") Map<Class<?>, Set<String>> setResultJsonFilter(Class<?> clazz, Set<String> set) { Map<Class<?>, Set<String>> includeMap = (Map<Class<?>, Set<String>>) RequestTool.getRequest().getAttribute("includeMap"); if (null == includeMap) RequestTool.getRequest().setAttribute("includeMap", includeMap = new HashMap<Class<?>, Set<String>>()); includeMap.put(clazz, set); RequestTool.getRequest().setAttribute("jsonFilter", new ComplexPropertyPreFilter(includeMap)); return includeMap; } void putEvent(Communicate model) { SpringContextUtil.getApplicationContext().publishEvent(new CommunicateEvent(model)); } @Autowired CommunicateService communicateService; @Autowired CommunicateHandle communicateHandle; @Autowired UserInfoService userInfoService; String defaultImg = ConfigLoader.loader.getString("user_default_img"); }
Spring异步观察者事件处理:
@Component @SuppressWarnings("unchecked") public class CommunicateHandle implements ApplicationListener<CommunicateEvent> { static final String chartsKey = "_chart_list"; static final String friendsKey = "_friend_list"; @Override public void onApplicationEvent(CommunicateEvent event) { Communicate model = (Communicate) event.getSource(); if (null == model.getId()) communicateService.save(model); // 保存DB对象 // 查询并更新自己的好友列表 getAndAddFriendList(model); // 查询并更新聊天对象的好友列表 getAndAddFriendList(model, "friend"); // 查询并添加自己与聊天对象的记录列表 getAndAddChats(model); } List<Communicate> getAndAddChats(Communicate model) { List<Communicate> list = null; // 用户所有的聊天记录 try { list = getChatsCache(model.getUserId(), model.getChatId()); list.add(model); Collections.sort(list); // 排序此用户的消息队列 updateChatsCache(model.getUserId(), model.getChatId(), list);// 保存至缓存 } catch (Exception e) { e.printStackTrace(); } return list; } List<String> getAndAddFriendList(Communicate model, String... friends) { List<String> list = null; // 所有用户的好友 try { list = friends.length == 0 ? getFriendListCache(model.getUserId()) : getFriendListCache(model.getChatId()); if (friends.length == 0 ? !list.contains(model.getChatId()) && list.add(model.getChatId()) : !list.contains(model.getUserId()) && list.add(model.getUserId())) setCache(getKey(model, friends) + friendsKey, list); // 自动追加为好友 } catch (Exception e) { e.printStackTrace(); } return list; } String getKey(Communicate model, String... friends) { String key = model.getUserId(); if (friends.length > 0) key = model.getChatId(); return key; } public List<String> getFriendListCache(Object userId) throws Exception { List<String> list = (List<String>) MemCacheClient.get(userId + friendsKey); if (Str.isNull(list)) list = new ArrayList<String>(); return list; } public List<Communicate> getChatsCache(Object userId, Object chatsUserId) throws Exception { List<Communicate> list = (List<Communicate>) MemCacheClient.get(userId + "&" + chatsUserId + chartsKey); if (Str.isNull(list)) list = new ArrayList<Communicate>(); return list; } public boolean updateChatsCache(Object userId, Object chatsUserId, Object o) throws Exception { setCache(chatsUserId + "&" + userId + chartsKey, o); // 1296000秒 = 15天 setCache(userId + "&" + chatsUserId + chartsKey, o); // 1296000秒 = 15天 return true; } boolean setCache(String key, Object o) throws Exception { return MemCacheClient.set(key, 1296000, o); // 1296000秒 = 15天 } @Autowired CommunicateService communicateService; }
标签:
原文地址:http://my.oschina.net/linapex/blog/518651