标签:
1项目架构
基本框架5个服务器:
2游戏服线程
main-server and logic server配置
main-server
ExecutorService bossExecutor = Executors.newCachedThreadPool(); ExecutorService workerExecutor = Executors.newCachedThreadPool(); ChannelFactory channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(32, 0, 0, 15, TimeUnit.MINUTES, new ThreadFactory() { private AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "server-biz-pool-" + count.getAndAdd(1)); return t; } }));//定义名字 bootstrap.setPipelineFactory(new GameServerPipelineFactory(listenercon, executionHandler));//GameServerPipelineFactory 加载 规定的Handler.并且会将ExecutionHandler 加在lengthFieldDecoder之后 // 禁用Nagle 算法,适用于交互性强的客户端 bootstrap.setOption("child.tcpNoDelay", true); // 设置keepalive bootstrap.setOption("child.keepAlive", listenerConfiguration.isKeepalive()); bootstrap.bind(new InetSocketAddress(listenerConfiguration.getIp(), listenerConfiguration.getPort()));
logic-server 两种 type connection[对于mainserve来说],每种 挂了8个.共有16个连接
push[负责消息推送]
logic[负责业务处理]
pipelineFactory配置了相应的filter and Handler:
项目的Handler有这几个:
lengthFieldDecoder
ExecutionHandler[执行 ChannelHandler 链的整个过程是同步的,如果业务逻辑的耗时较长,会将导致Work线程长时间被占用得不到释放,从而影响了整个服务器的并发处理能力。
所 以,为了提高并发数,一般通过ExecutionHandler线程池来异步处理ChannelHandler链(worker线程在经过 ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收)。]
jsonDecoder
requestHandler[客户端传过来的action是string,回filter,like is login,通过规定的bean name和method 利用invoke执行,request num使用atomInteger record.]
lengthFieldPrepender
jsonEncoder
对于ExecutionHandler需要的线程池模型,Netty提供了两种可选:
1) MemoryAwareThreadPoolExecutor 通过对线程池内存的使用控制,可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限,防止内存溢出错误;
2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类。除了MemoryAwareThreadPoolExecutor 的功能之外,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一 Channel中的事件都在一个线程中执行(通常也没必要)。
例如:
Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) --->
\ /
X
/ \
Thread Y: --- Channel B (Event B1) --‘ ‘-- Channel A (Event A2) --- Channel A (Event A3) --->
上图表达的意思有几个:
(1)对整个线程池而言,处理同一个Channel的事件,必须是按照顺序来处理的。例如,必须先处理完Channel A (Event A1) ,再处理Channel A (Event A2)、Channel A (Event A3)
(2)同一个Channel的多个事件,会分布到线程池的多个线程中去处理。
(3)不同Channel的事件可以同时处理(分担到多个线程),互不影响。
OrderedMemoryAwareThreadPoolExecutor 的这种事件处理有序性是有意义的,因为通常情况下,请求发送端希望服务器能够按照顺序处理自己的请求,特别是需要多次握手的应用层协议。例如:XMPP协议。
(ExecutionHandler参考:http://www.cnblogs.com/TeaMax/archive/2013/04/03/2997874.html)
//项目中使用的 ExecutionHandler exeHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor(8, 0, 0));
3项目消息队列
推送队列
原理:将需要的推送信息,按照优先级存进redis相应的队列中,然后按照优先级取出来.
//异步消息处理 //主要线程 private void commandProcess() { try { while (true) { final Command command = commandDao.get(Priority.HIGH, Priority.NORMAL, Priority.LOWER, Priority.MESSAGE);//阻塞 if (command != null) { final CommandHandler commandHandler = CommandHandlerFactory.getInstance().getHandler(command.getCommand()); Runnable task = new Runnable() { @Override public void run() { commandHandler.handle(command);//将处理好的消息通过上面的push connection传输 } }; GameTaskProcessor.ins().general(task);//线程池exec } else { break; } } } catch (Exception e) { LOG.error(e.getMessage(), e); try { Thread.sleep(1000 * 5); } catch (InterruptedException ex) { LOG.error(ex.getMessage(), ex); } } } //缓存Handler.也可以使用spring getbeanbyname代替. public class CommandHandlerFactory { private static final Logger LOG = LoggerFactory.getLogger(CommandHandlerFactory.class); private Map<Integer, CommandHandler> commandHandlerMap = new HashMap<Integer, CommandHandler>(); private static CommandHandlerFactory instance = null; private CommandHandlerFactory() { } public static CommandHandlerFactory getInstance() { if (instance == null) { instance = new CommandHandlerFactory(); } return instance; } public void register(int command, CommandHandler handler) {//spring 实例化相应的Handler时,init method会调用 LOG.info("command handler register.command[" + command + "]"); commandHandlerMap.put(command, handler); } public CommandHandler getHandler(int command) { int key = command / 10000; if (commandHandlerMap.containsKey(key)) { return commandHandlerMap.get(key); } else { LOG.warn("命令处理器不存在.command[" + command + "], key[" + key + "]"); } return null; } } //put/get command public class CommandDaoRedisImpl implements CommandDao { private static final int TIME_OUT = 2; private String getKey(int priority) { return RedisKey.getCommandKey(priority); } @Override public boolean add(Command command) { String key = getKey(command.getPriority()); JedisUtils.pushMsg(key, Json.toJson(command)); return true; } @Override public Command get(Integer... proritys) { String[] keys = new String[proritys.length]; for (int i = 0; i < proritys.length; i++) { String key = getKey(proritys[i]); keys[i] = key; } String json = JedisUtils.blockPopMsg(TIME_OUT, keys);//阻塞方法 if (json != null) { return Json.toObject(json, Command.class); } return null; } }
最后利用push connection 随机一个connection,交给netty处理.
logic队列
利用netty channel link+java reflection.
标签:
原文地址:http://www.cnblogs.com/ephuizi/p/4348461.html