标签:jdk 异常 logger 信息 function roo 匿名函数 factor ref
https://blog.csdn.net/evil_lrn/article/details/105808364
开始前,首先先学习一下概念room和namespace
官方地址链接 地址
简单的来说,socket会属于某一个room,如果没有指定那么就socket就会归属默认的room,每个room又会属于某一namespace下,默认namespace是/。
客户端连接时可以指定自己的socket的归属哪个namespace ,(这里有个坑,自己定义的namespace 一定是/xiuweiSapce,注意是/xiuweiSapce不是xiuweiSapce)比如使用
var socket = io.connect("http://localhost:9099/xiuweiSpace?room=F006");
至于归属那个room就是服务端自己设置的了。
接着就是Netty-socketIo的api了,这里不多做叙述。分享一篇博客,链接 ,文章内容排版都比我写的好。
剩下的言归正传,这是使用redis并且使用Java客户端redission作为集群推送的方案原因是因为netty-soketio与redisson是同一个作者。(这里再次膜拜大神)。这里使用redis发布订阅功能,需要服务端主动推送消息的时候,不管消息发动到哪个实例都会通过发布订阅推送到其他实例上,最总获取到所有的socketClient推送消息。
接着一言不合上代码
引入所需要的pom
-
-
<groupId>com.corundumstudio.socketio</groupId>
-
<artifactId>netty-socketio</artifactId>
-
<version>1.7.17</version>
-
-
-
-
<groupId>org.redisson</groupId>
-
<artifactId>redisson</artifactId>
-
<version>3.11.0</version>
-
相关的配置文件与配置类
-
-
-
lsmdjsj.websocket.host=socketio.host=localhost
-
lsmdjsj.websocket.socket-port=9099
-
-
lsmdjsj.websocket.maxFramePayloadLength=1048576
-
-
lsmdjsj.websocket.maxHttpContentLength=1048576
-
-
lsmdjsj.websocket.upgradeTimeout=1000000
-
-
lsmdjsj.websocket.pingTimeout=6000000
-
-
lsmdjsj.websocket.pingInterval=25000
-
-
-
-
lsmdjsj.redisson.address=redis://127.0.0.1:6379
-
lsmdjsj.redisson.password=
-
lsmdjsj.redisson.database=5
-
-
-
@ConfigurationProperties(prefix = "lsmdjsj.websocket")
-
public class WebSocketProperties {
-
-
-
-
-
-
-
-
-
private Integer socketPort;
-
-
-
-
private String maxFramePayloadLength;
-
-
-
-
private String maxHttpContentLength;
-
-
-
-
-
-
private Integer pingInterval;
-
-
-
-
private Integer pingTimeout;
-
-
-
-
private Integer upgradeTimeout;
-
-
-
@ConfigurationProperties(prefix = "lsmdjsj.redisson")
-
public class RedissonProperty {
-
private int timeout = 3000;
-
-
-
-
-
-
private int connectionPoolSize = 5;
-
-
private int connectionMinimumIdleSize=2;
-
-
private int slaveConnectionPoolSize = 250;
-
-
private int masterConnectionPoolSize = 250;
-
-
private String[] sentinelAddresses;
-
-
private String masterName;
-
private int database = 1;
-
-
初始化netty-socketio的服务端和redisson
-
-
@EnableConfigurationProperties(RedissonProperty.class)
-
public class RedissonConfig {
-
-
-
private RedissonProperty conf;
-
-
@Bean(name="redission",destroyMethod="shutdown")
-
public RedissonClient redission() {
-
Config config = new Config();
-
config.setCodec(new org.redisson.client.codec.StringCodec());
-
if(conf.getSentinelAddresses()!=null && conf.getSentinelAddresses().length>0){
-
config.useSentinelServers()
-
.setMasterName(conf.getMasterName()).addSentinelAddress(conf.getSentinelAddresses())
-
.setPassword(conf.getPassword()).setDatabase(conf.getDatabase());
-
-
SingleServerConfig serverConfig = config.useSingleServer()
-
.setAddress(conf.getAddress())
-
.setTimeout(conf.getTimeout())
-
.setConnectionPoolSize(conf.getConnectionPoolSize())
-
.setConnectionMinimumIdleSize(conf.getConnectionMinimumIdleSize())
-
.setDatabase(conf.getDatabase());
-
if(StringUtils.isNotBlank(conf.getPassword())) {
-
serverConfig.setPassword(conf.getPassword());
-
-
-
return Redisson.create(config);
-
-
初始化netty-socketio
-
-
public class SocketIoConfig {
-
-
private static Logger logger = LoggerFactory.getLogger(SocketIoConfig.class);
-
-
-
private RedissonClient redisson;
-
-
-
private WebSocketProperties webSocketProperties;
-
-
-
private NettyExceptionListener nettyExceptionListener;
-
-
-
-
-
-
private RedissonStoreFactory createRedissonStoreFactory(){
-
logger.info("创建 RedissonStoreFactory 开始");
-
RedissonStoreFactory redissonStoreFactory = new RedissonStoreFactory(redisson);
-
logger.info("创建 RedissonStoreFactory 结束");
-
return redissonStoreFactory;
-
-
-
-
-
-
public SocketIOServer getSocketIOServer(){
-
logger.info("创建 SocketIOServer 开始");
-
-
SocketConfig socketConfig = new SocketConfig();
-
-
socketConfig.setTcpNoDelay(true);
-
-
-
socketConfig.setSoLinger(0);
-
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
-
-
config.setPort(webSocketProperties.getSocketPort());
-
-
config.setUpgradeTimeout(webSocketProperties.getUpgradeTimeout());
-
-
config.setPingInterval(webSocketProperties.getPingInterval());
-
-
config.setPingTimeout(webSocketProperties.getPingTimeout());
-
-
config.setStoreFactory(createRedissonStoreFactory());
-
-
config.setExceptionListener(nettyExceptionListener);
-
-
config.setAckMode(AckMode.MANUAL);
-
-
config.setAuthorizationListener(data -> {
-
-
-
-
-
config.setSocketConfig(socketConfig);
-
logger.info("创建 SocketIOServer 结束");
-
return new SocketIOServer(config);
-
-
-
-
-
-
-
-
-
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
-
return new SpringAnnotationScanner(socketServer);
-
-
-
-
public PubSubStore pubSubStore(SocketIOServer socketServer) {
-
return socketServer.getConfiguration().getStoreFactory().pubSubStore();
-
-
启动socket服务,并且订阅事件,也可以自定义设置相应的namespace的相关事件
-
-
-
public class SocketServerRunner implements CommandLineRunner {
-
-
private static Logger logger = LoggerFactory.getLogger(SocketServerRunner.class);
-
-
-
private SocketIOServer socketIOServer;
-
-
-
private PubSubStore pubSubStore;
-
-
-
private RedissonClient redisson;
-
-
-
public void run(String... args) throws Exception {
-
logger.info("socketIOServer 启动");
-
-
-
-
pubSubStore.subscribe(PubSubType.DISPATCH,data -> {
-
Collection<SocketIOClient> clients = null;
-
String room = data.getRoom();
-
String namespace = data.getNamespace();
-
Packet packet = data.getPacket();
-
String jsonData = packet.getData();
-
if(!StringUtils.isBlank(namespace)){
-
SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);
-
if(StringUtils.isBlank(room)){
-
clients = socketIONamespace.getRoomOperations(room).getClients();
-
-
-
clients = socketIOServer.getBroadcastOperations().getClients();
-
-
if(!CollectionUtils.isEmpty(clients)){
-
for (SocketIOClient client : clients) {
-
client.sendEvent(Constants.PUSH_MSG,jsonData);
-
-
-
},DispatchMessage.class);
-
addNameSpace(socketIOServer);
-
-
-
-
private void addNameSpace(SocketIOServer socketIOServer){
-
SocketIONamespace xiuweiSpace = socketIOServer.addNamespace(Constants.XIU_WEI_NAME_SPACE);
-
xiuweiSpace.addConnectListener(client -> {
-
Map<String,Object> clientMap = new HashMap<>(16);
-
String nameSpace = client.getNamespace().getName();
-
String room = client.getHandshakeData().getSingleUrlParam("room");
-
String sessionId = client.getSessionId().toString();
-
logger.info("xiuweiSpace连接成功, room={},nameSpace={}, sessionId={}", room, nameSpace,sessionId);
-
if(StringUtils.isNotBlank(room)){
-
-
clientMap.put("rooms",room);
-
-
clientMap.put("createTime", LocalDateTime.now().toString());
-
redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).trySet(clientMap);
-
-
xiuweiSpace.addDisconnectListener(client -> {
-
logger.info("客户端:" + client.getSessionId() + "断开连接");
-
String sessionId = client.getSessionId().toString();
-
redisson.getBucket(Constants.KEY_ROOM_PREFIX+Constants.XIU_WEI_NAME_SPACE+sessionId).delete();
-
-
xiuweiSpace.addEventListener(Constants.XIU_WEI_EVINT,String.class,(client, data, ackSender) -> {
-
client.sendEvent(Constants.XIU_WEI_EVINT,data);
-
if (ackSender.isAckRequested()) {
-
logger.info("xiuwei接受到的消息. message={}", data);
-
-
-
-
默认处理未指定namesapce的socketClient 这里使用标签
-
-
public class MessageEventHandler {
-
-
private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
-
-
public static ConcurrentMap<String, SocketIOClient> socketIOClientMap = new ConcurrentHashMap<>();
-
-
-
-
private RedissonClient redisson;
-
-
-
private SocketIOServer socketIOServer;
-
-
-
public void onConnect(SocketIOClient client){
-
Map<String,Object> clientMap = new HashMap<>(16);
-
-
String room = client.getHandshakeData().getSingleUrlParam("room");
-
String nameSpace = client.getNamespace().getName();
-
String sessionId = client.getSessionId().toString();
-
logger.info("socket连接成功, room={}, sessionId={},namespace={}",room,sessionId,nameSpace);
-
if(StringUtils.isNotBlank(room)){
-
-
clientMap.put("rooms",room);
-
-
clientMap.put("createTime", LocalDateTime.now().toString());
-
redisson.getBucket(Constants.KEY_ROOM_PREFIX+sessionId).trySet(clientMap);
-
-
-
-
-
-
-
-
-
-
public void onDisconnect(SocketIOClient client) {
-
logger.info("客户端:" + client.getSessionId() + "断开连接");
-
-
-
-
-
-
-
-
-
-
@OnEvent(value = "messageevent")
-
public void onEvent(SocketIOClient client, AckRequest request, String msg) {
-
logger.info("发来消息:" + msg);
-
-
JSONObject jsonObject = JSON.parseObject(msg);
-
String message = jsonObject.getString("message");
-
Collection<SocketIOClient> clients = socketIOServer.getBroadcastOperations().getClients();
-
for (SocketIOClient clientByRoom : clients) {
-
clientByRoom.sendEvent("messageevent", client.getSessionId().toString()+": "+message);
-
-
-
-
最后主动推送客户端的controller
-
-
-
public SinoHttpResponse<Boolean> pushMsgByService(@RequestBody ChatMessage chatMessage){
-
SocketIONamespace namespace = socketIOServer.getNamespace(chatMessage.getNamespace());
-
Collection<SocketIOClient> allClients = namespace.getAllClients();
-
for (SocketIOClient client : allClients) {
-
client.sendEvent(chatMessage.getEventName(),chatMessage.getMessage());
-
-
return SinoHttpResponse.success(true);
-
最后贴一下可以测试的html ,本人前端技术巨渣,这里使用jquery写的demo
-
-
<html lang="en" xmlns="http://www.w3.org/1999/xhtml" xmlns:th="http://www.thymeleaf.org">
-
-
<title>webSocket测试</title>
-
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.3.0/socket.io.js"></script>
-
-
<link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap.min.css">
-
-
<link rel="stylesheet" href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap-theme.min.css">
-
-
<script src="//cdn.bootcss.com/jquery/1.11.3/jquery.min.js"></script>
-
-
-
<script src="//cdn.bootcss.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
-
<script type="text/javascript">
-
-
-
-
-
-
var socket = io.connect("http://localhost:9099?room=F006");
-
-
-
-
-
socket.on(‘connect‘, function(){
-
$("#tou").html("链接服务器成功!");
-
-
-
socket.on(‘disconnect‘, function(){
-
$("#tou").html("与服务器断开了链接!");
-
-
-
socket.on(‘messageevent‘, function(data) {
-
$("#msg").html($("#msg").html() + "<br/>" + data);
-
-
-
-
socket.socket.reconnect();
-
-
-
-
-
-
-
$(‘#send‘).bind(‘click‘, function() {
-
-
-
-
-
-
-
-
-
var message = document.getElementById(‘message‘).value;
-
-
var obj = {message:message,title:title};
-
var str = JSON.stringify(obj);
-
socket.emit("messageevent",str);
-
-
-
-
-
-
-
-
-
-
<div class="page-header" id="tou">
-
-
-
<div class="well" id="msg">
-
-
-
<div class="input-group">
-
<input type="text" class="form-control" placeholder="发送信息..." id="message">
-
<span class="input-group-btn">
-
<button class="btn btn-default" type="button" id="send" >发送</button>
-
-
-
-
-
后台主动发送消息:<button class="btn btn-default" type="button" onclick="mess()">发送</button>
-
-
<a href="/user/index">back to index</a>
-
<script src="http://js.biocloud.cn/jquery/1.11.3/jquery.min.js"></script>
-
-
-
-
-
-
url: "/websocket/auditing",
-
success: function (data) {
-
-
-
-
-
-
-
最后感谢大家的观看,这只是一个demo,会存在各种各样的问题,欢迎大家指正
Netty-socketio集成redis,服务端集群推送消息
标签:jdk 异常 logger 信息 function roo 匿名函数 factor ref
原文地址:https://www.cnblogs.com/xiami2046/p/13899962.html