标签:jdk   异常   logger   信息   function   roo   匿名函数   factor   ref   
https://blog.csdn.net/evil_lrn/article/details/105808364
开始前,首先先学习一下概念room和namespace
官方地址链接 地址
简单的来说,socket会属于某一个room,如果没有指定那么就socket就会归属默认的room,每个room又会属于某一namespace下,默认namespace是/。
客户端连接时可以指定自己的socket的归属哪个namespace ,(这里有个坑,自己定义的namespace 一定是/xiuweiSapce,注意是/xiuweiSapce不是xiuweiSapce)比如使用
var socket = io.connect("http://localhost:9099/xiuweiSpace?room=F006");
至于归属那个room就是服务端自己设置的了。
接着就是Netty-socketIo的api了,这里不多做叙述。分享一篇博客,链接 ,文章内容排版都比我写的好。
剩下的言归正传,这是使用redis并且使用Java客户端redission作为集群推送的方案原因是因为netty-soketio与redisson是同一个作者。(这里再次膜拜大神)。这里使用redis发布订阅功能,需要服务端主动推送消息的时候,不管消息发动到哪个实例都会通过发布订阅推送到其他实例上,最总获取到所有的socketClient推送消息。
接着一言不合上代码 
引入所需要的pom
- 
  
- 
  
<groupId>com.corundumstudio.socketio</groupId> 
 
- 
  
<artifactId>netty-socketio</artifactId> 
 
- 
  
<version>1.7.17</version> 
 
- 
  
- 
    
- 
  
- 
  
<groupId>org.redisson</groupId> 
 
- 
  
<artifactId>redisson</artifactId> 
 
- 
  
<version>3.11.0</version> 
 
- 
  
相关的配置文件与配置类
- 
  
- 
  
- 
  
lsmdjsj.websocket.host=socketio.host=localhost 
 
- 
  
lsmdjsj.websocket.socket-port=9099 
 
- 
  
- 
  
lsmdjsj.websocket.maxFramePayloadLength=1048576 
 
- 
  
- 
  
lsmdjsj.websocket.maxHttpContentLength=1048576 
 
- 
  
- 
  
lsmdjsj.websocket.upgradeTimeout=1000000 
 
- 
  
- 
  
lsmdjsj.websocket.pingTimeout=6000000 
 
- 
  
- 
  
lsmdjsj.websocket.pingInterval=25000 
 
- 
    
- 
    
- 
  
- 
  
lsmdjsj.redisson.address=redis://127.0.0.1:6379 
 
- 
  
lsmdjsj.redisson.password= 
 
- 
  
lsmdjsj.redisson.database=5 
 
- 
  
- 
  
- 
  
@ConfigurationProperties(prefix = "lsmdjsj.websocket") 
 
- 
  
public class WebSocketProperties { 
 
- 
    
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
private Integer socketPort; 
 
- 
  
- 
  
- 
  
- 
  
private String maxFramePayloadLength; 
 
- 
  
- 
  
- 
  
- 
  
private String maxHttpContentLength; 
 
- 
    
- 
    
- 
  
- 
  
- 
  
- 
  
private Integer pingInterval; 
 
- 
  
- 
  
- 
  
- 
  
private Integer pingTimeout; 
 
- 
  
- 
  
- 
  
- 
  
private Integer upgradeTimeout; 
 
- 
  
- 
  
- 
  
@ConfigurationProperties(prefix = "lsmdjsj.redisson") 
 
- 
  
public class RedissonProperty { 
 
- 
  
private int timeout = 3000; 
 
- 
    
- 
  
- 
    
- 
  
- 
    
- 
  
private int connectionPoolSize = 5; 
 
- 
    
- 
  
private int connectionMinimumIdleSize=2; 
 
- 
    
- 
  
private int slaveConnectionPoolSize = 250; 
 
- 
    
- 
  
private int masterConnectionPoolSize = 250; 
 
- 
    
- 
  
private String[] sentinelAddresses; 
 
- 
    
- 
  
private String masterName; 
 
- 
  
private int database = 1; 
 
- 
    
- 
  
初始化netty-socketio的服务端和redisson
- 
  
- 
  
@EnableConfigurationProperties(RedissonProperty.class) 
 
- 
  
public class RedissonConfig { 
 
- 
    
- 
  
- 
  
private RedissonProperty conf; 
 
- 
    
- 
  
@Bean(name="redission",destroyMethod="shutdown") 
 
- 
  
public RedissonClient redission() { 
 
- 
  
Config config = new Config(); 
 
- 
  
config.setCodec(new org.redisson.client.codec.StringCodec()); 
 
- 
  
if(conf.getSentinelAddresses()!=null && conf.getSentinelAddresses().length>0){ 
 
- 
  
config.useSentinelServers() 
 
- 
  
.setMasterName(conf.getMasterName()).addSentinelAddress(conf.getSentinelAddresses()) 
 
- 
  
.setPassword(conf.getPassword()).setDatabase(conf.getDatabase()); 
 
- 
  
- 
  
SingleServerConfig serverConfig = config.useSingleServer() 
 
- 
  
.setAddress(conf.getAddress()) 
 
- 
  
.setTimeout(conf.getTimeout()) 
 
- 
  
.setConnectionPoolSize(conf.getConnectionPoolSize()) 
 
- 
  
.setConnectionMinimumIdleSize(conf.getConnectionMinimumIdleSize()) 
 
- 
  
.setDatabase(conf.getDatabase()); 
 
- 
  
if(StringUtils.isNotBlank(conf.getPassword())) { 
 
- 
  
serverConfig.setPassword(conf.getPassword()); 
 
- 
  
- 
  
- 
  
return Redisson.create(config); 
 
- 
  
- 
  
初始化netty-socketio
- 
  
- 
  
public class SocketIoConfig { 
 
- 
    
- 
  
private static Logger logger = LoggerFactory.getLogger(SocketIoConfig.class); 
 
- 
    
- 
  
- 
  
private RedissonClient redisson; 
 
- 
    
- 
  
- 
  
private WebSocketProperties webSocketProperties; 
 
- 
    
- 
  
- 
  
private NettyExceptionListener nettyExceptionListener; 
 
- 
    
- 
  
- 
  
- 
  
- 
  
- 
  
private RedissonStoreFactory createRedissonStoreFactory(){ 
 
- 
  
logger.info("创建 RedissonStoreFactory 开始"); 
 
- 
  
RedissonStoreFactory redissonStoreFactory = new RedissonStoreFactory(redisson); 
 
- 
  
logger.info("创建 RedissonStoreFactory 结束"); 
 
- 
  
return redissonStoreFactory; 
 
- 
  
- 
    
- 
    
- 
    
- 
  
- 
  
public SocketIOServer getSocketIOServer(){ 
 
- 
  
logger.info("创建 SocketIOServer 开始"); 
 
- 
  
- 
  
SocketConfig socketConfig = new SocketConfig(); 
 
- 
    
- 
  
socketConfig.setTcpNoDelay(true); 
 
- 
  
- 
  
- 
  
socketConfig.setSoLinger(0); 
 
- 
  
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); 
 
- 
  
- 
  
config.setPort(webSocketProperties.getSocketPort()); 
 
- 
  
- 
  
config.setUpgradeTimeout(webSocketProperties.getUpgradeTimeout()); 
 
- 
  
- 
  
config.setPingInterval(webSocketProperties.getPingInterval()); 
 
- 
  
- 
  
config.setPingTimeout(webSocketProperties.getPingTimeout()); 
 
- 
  
- 
  
config.setStoreFactory(createRedissonStoreFactory()); 
 
- 
  
- 
  
config.setExceptionListener(nettyExceptionListener); 
 
- 
  
- 
  
config.setAckMode(AckMode.MANUAL); 
 
- 
  
- 
  
config.setAuthorizationListener(data -> { 
 
- 
  
- 
  
- 
  
- 
  
- 
  
config.setSocketConfig(socketConfig); 
 
- 
  
logger.info("创建 SocketIOServer 结束"); 
 
- 
  
return new SocketIOServer(config); 
 
- 
  
- 
    
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { 
 
- 
  
return new SpringAnnotationScanner(socketServer); 
 
- 
  
- 
    
- 
  
- 
  
public PubSubStore pubSubStore(SocketIOServer socketServer) { 
 
- 
  
return socketServer.getConfiguration().getStoreFactory().pubSubStore(); 
 
- 
  
- 
  
启动socket服务,并且订阅事件,也可以自定义设置相应的namespace的相关事件
- 
  
- 
  
- 
  
public class SocketServerRunner implements CommandLineRunner { 
 
- 
    
- 
  
private static Logger logger = LoggerFactory.getLogger(SocketServerRunner.class); 
 
- 
    
- 
  
- 
  
private SocketIOServer socketIOServer; 
 
- 
    
- 
  
- 
  
private PubSubStore pubSubStore; 
 
- 
    
- 
  
- 
  
private RedissonClient redisson; 
 
- 
    
- 
  
- 
  
public void run(String... args) throws Exception { 
 
- 
  
logger.info("socketIOServer 启动"); 
 
- 
  
- 
    
- 
  
- 
  
pubSubStore.subscribe(PubSubType.DISPATCH,data -> { 
 
- 
  
Collection<SocketIOClient> clients = null; 
 
- 
  
String room = data.getRoom(); 
 
- 
  
String namespace = data.getNamespace(); 
 
- 
  
Packet packet = data.getPacket(); 
 
- 
  
String jsonData = packet.getData(); 
 
- 
  
if(!StringUtils.isBlank(namespace)){ 
 
- 
  
SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace); 
 
- 
  
if(StringUtils.isBlank(room)){ 
 
- 
  
clients = socketIONamespace.getRoomOperations(room).getClients(); 
 
- 
  
- 
  
- 
  
clients = socketIOServer.getBroadcastOperations().getClients(); 
 
- 
  
- 
  
if(!CollectionUtils.isEmpty(clients)){ 
 
- 
  
for (SocketIOClient client : clients) { 
 
- 
  
client.sendEvent(Constants.PUSH_MSG,jsonData); 
 
- 
  
- 
  
- 
  
},DispatchMessage.class); 
 
- 
  
addNameSpace(socketIOServer); 
 
- 
  
- 
    
- 
    
- 
  
private void addNameSpace(SocketIOServer socketIOServer){ 
 
- 
  
SocketIONamespace xiuweiSpace = socketIOServer.addNamespace(Constants.XIU_WEI_NAME_SPACE); 
 
- 
  
xiuweiSpace.addConnectListener(client -> { 
 
- 
  
Map<String,Object> clientMap = new HashMap<>(16); 
 
- 
  
String nameSpace = client.getNamespace().getName(); 
 
- 
  
String room = client.getHandshakeData().getSingleUrlParam("room"); 
 
- 
  
String sessionId = client.getSessionId().toString(); 
 
- 
  
logger.info("xiuweiSpace连接成功, room={},nameSpace={}, sessionId={}", room, nameSpace,sessionId); 
 
- 
  
if(StringUtils.isNotBlank(room)){ 
 
- 
  
- 
  
clientMap.put("rooms",room); 
 
- 
  
- 
  
clientMap.put("createTime", LocalDateTime.now().toString()); 
 
- 
  
redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).trySet(clientMap); 
 
- 
  
- 
  
xiuweiSpace.addDisconnectListener(client -> { 
 
- 
  
logger.info("客户端:" + client.getSessionId() + "断开连接"); 
 
- 
  
String sessionId = client.getSessionId().toString(); 
 
- 
  
redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).delete(); 
 
- 
  
- 
  
xiuweiSpace.addEventListener(Constants.XIU_WEI_EVINT,String.class,(client, data, ackSender) -> { 
 
- 
  
client.sendEvent(Constants.XIU_WEI_EVINT,data); 
 
- 
  
if (ackSender.isAckRequested()) { 
 
- 
  
logger.info("xiuwei接受到的消息. message={}", data); 
 
- 
  
- 
  
- 
  
- 
  
默认处理未指定namesapce的socketClient 这里使用标签
- 
  
- 
  
public class MessageEventHandler { 
 
- 
    
- 
  
private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class); 
 
- 
    
- 
  
public static ConcurrentMap<String, SocketIOClient> socketIOClientMap = new ConcurrentHashMap<>(); 
 
- 
    
- 
    
- 
  
- 
  
private RedissonClient redisson; 
 
- 
    
- 
  
- 
  
private SocketIOServer socketIOServer; 
 
- 
    
- 
  
- 
  
public void onConnect(SocketIOClient client){ 
 
- 
  
Map<String,Object> clientMap = new HashMap<>(16); 
 
- 
  
- 
  
String room = client.getHandshakeData().getSingleUrlParam("room"); 
 
- 
  
String nameSpace = client.getNamespace().getName(); 
 
- 
  
String sessionId = client.getSessionId().toString(); 
 
- 
  
logger.info("socket连接成功, room={}, sessionId={},namespace={}",room,sessionId,nameSpace); 
 
- 
  
if(StringUtils.isNotBlank(room)){ 
 
- 
  
- 
  
clientMap.put("rooms",room); 
 
- 
  
- 
  
clientMap.put("createTime", LocalDateTime.now().toString()); 
 
- 
  
redisson.getBucket(Constants.KEY_ROOM_PREFIX+sessionId).trySet(clientMap); 
 
- 
  
- 
  
- 
    
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
public void onDisconnect(SocketIOClient client) { 
 
- 
  
logger.info("客户端:" + client.getSessionId() + "断开连接"); 
 
- 
  
- 
    
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
@OnEvent(value = "messageevent") 
 
- 
  
public void onEvent(SocketIOClient client, AckRequest request, String msg) { 
 
- 
  
logger.info("发来消息:" + msg); 
 
- 
  
- 
  
JSONObject jsonObject = JSON.parseObject(msg); 
 
- 
  
String message = jsonObject.getString("message"); 
 
- 
  
Collection<SocketIOClient> clients = socketIOServer.getBroadcastOperations().getClients(); 
 
- 
  
for (SocketIOClient clientByRoom : clients) { 
 
- 
  
clientByRoom.sendEvent("messageevent", client.getSessionId().toString()+": "+message); 
 
- 
  
- 
  
- 
    
- 
  
最后主动推送客户端的controller
- 
  
- 
  
- 
  
public SinoHttpResponse<Boolean> pushMsgByService(@RequestBody ChatMessage chatMessage){ 
 
- 
  
SocketIONamespace namespace = socketIOServer.getNamespace(chatMessage.getNamespace()); 
 
- 
  
Collection<SocketIOClient> allClients = namespace.getAllClients(); 
 
- 
  
for (SocketIOClient client : allClients) { 
 
- 
  
client.sendEvent(chatMessage.getEventName(),chatMessage.getMessage()); 
 
- 
  
- 
  
return SinoHttpResponse.success(true); 
 
- 
  
最后贴一下可以测试的html ,本人前端技术巨渣,这里使用jquery写的demo
- 
  
- 
  
<html lang="en" xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org"> 
 
- 
  
- 
  
<title>webSocket测试</title> 
 
- 
  
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.3.0/socket.io.js"></script> 
 
- 
  
- 
  
<link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap.min.css"> 
 
- 
  
- 
  
<link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap-theme.min.css"> 
 
- 
  
- 
  
<script src="//cdn.bootcss.com/jquery/1.11.3/jquery.min.js"></script> 
 
- 
  
- 
  
- 
  
<script src="//cdn.bootcss.com/bootstrap/3.3.5/js/bootstrap.min.js"></script> 
 
- 
  
<script type="text/javascript"> 
 
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
var socket = io.connect("http://localhost:9099?room=F006"); 
 
- 
  
- 
  
- 
  
- 
  
- 
  
socket.on(‘connect‘, function(){ 
 
- 
  
$("#tou").html("链接服务器成功!"); 
 
- 
  
- 
  
- 
  
socket.on(‘disconnect‘, function(){ 
 
- 
  
$("#tou").html("与服务器断开了链接!"); 
 
- 
  
- 
  
- 
  
socket.on(‘messageevent‘, function(data) { 
 
- 
  
$("#msg").html($("#msg").html() + "<br/>" + data); 
 
- 
  
- 
  
- 
  
- 
  
socket.socket.reconnect(); 
 
- 
  
- 
    
- 
    
- 
    
- 
    
- 
    
- 
  
$(‘#send‘).bind(‘click‘, function() { 
 
- 
  
- 
  
- 
    
- 
    
- 
    
- 
  
- 
  
- 
  
- 
  
var message = document.getElementById(‘message‘).value; 
 
- 
  
- 
  
var obj = {message:message,title:title}; 
 
- 
  
var str = JSON.stringify(obj); 
 
- 
  
socket.emit("messageevent",str); 
 
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
    
- 
  
- 
  
- 
  
<div class="page-header" id="tou"> 
 
- 
  
- 
  
- 
  
<div class="well" id="msg"> 
 
- 
  
- 
  
- 
  
<div class="input-group"> 
 
- 
  
<input type="text" class="form-control" placeholder="发送信息..." id="message"> 
 
- 
  
<span class="input-group-btn"> 
 
- 
  
<button class="btn btn-default" type="button" id="send" >发送</button> 
 
- 
  
- 
  
- 
  
- 
  
- 
  
后台主动发送消息:<button class="btn btn-default" type="button" onclick="mess()">发送</button> 
 
- 
  
- 
  
<a href="/user/index">back to index</a> 
 
- 
  
<script src="http://js.biocloud.cn/jquery/1.11.3/jquery.min.js"></script> 
 
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
url: "/websocket/auditing", 
 
- 
  
success: function (data) { 
 
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
- 
  
最后感谢大家的观看,这只是一个demo,会存在各种各样的问题,欢迎大家指正
Netty-socketio集成redis,服务端集群推送消息
标签:jdk   异常   logger   信息   function   roo   匿名函数   factor   ref   
原文地址:https://www.cnblogs.com/xiami2046/p/13899962.html