码迷,mamicode.com
首页 > 编程语言 > 详细

JAVA NIO udp 实现 群转发

时间:2019-10-24 15:45:19      阅读:86      评论:0      收藏:0      [点我收藏+]

标签:selector   rgs   java nio   oca   nts   exe   current   stat   system   

场景很简单,就是多个客户端通过udp,连接到服务器(其实是无连接的,就是服务器保存了客户端的ip信息)。然后通过udp协议先服务器发送消息,然后服务器在通过udp转发在各个客服端。
这个是不是 观察者模式
server代码

public class Server {
    private static LinkedList<SocketAddress> list=new LinkedList<SocketAddress>();
    private static final ExecutorService executorService = Executors.newFixedThreadPool(4);//这里用多线程去转发,可以提高效率。emmm,udp没有3次握手,速度应该很快,多不多线程应该差不了太多
    private static DatagramChannel server=null;
    private static Selector selector=null;
    static {
        try{
        server=DatagramChannel.open().bind(new InetSocketAddress(8889));
        server.configureBlocking(false);
        selector=Selector.open();

        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args)throws Exception {

        server.register(selector, SelectionKey.OP_READ);
        while (true){
            if (selector.select()>0){
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isReadable()){
                    String msg = readMsg(selectionKey);
                    broadcast(msg);
                }
                iterator.remove();
            }}
        }
    }
    public static void saveIP(SocketAddress address)throws Exception{
        if (!list.contains(address)){
            list.add(address);
            System.out.println("新增ip:"+address);
        }
        System.out.println("当前udp 保存的ip数量:"+list.size());
    }
    public static void broadcast(String msg) throws Exception{

        //DatagramPacket packet=new DatagramPacket(msg.getBytes(),0,msg.getBytes().length);
        for (SocketAddress address:list
             ) {
            ByteBuffer byteBuffer=ByteBuffer.wrap(msg.getBytes());
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try{
                        server.send(byteBuffer, address);
                        System.out.println(Thread.currentThread().getName()+":"+address+"发送成功");
                    }catch (Exception e){
                        System.out.println(Thread.currentThread().getName()+":"+address+"发送失败");
                    }
                }
            });
        }
    }
    public static String readMsg(SelectionKey selectionKey)throws Exception{
        DatagramChannel channel=(DatagramChannel)selectionKey.channel();
        System.out.println(channel.getRemoteAddress()+":"+channel.getLocalAddress());
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        SocketAddress receive = channel.receive(byteBuffer);
        saveIP(receive);
        byteBuffer.flip();
        String msg=new String(byteBuffer.array(),"utf-8");
        System.out.println("收到消息:"+msg);
        return msg;
    }
}

client 代码

public class Client {
    public static DatagramSocket socket=null;
    public static void main(String[] args) throws Exception{
        try {
            socket=new DatagramSocket();
            Scanner scanner=new Scanner(System.in);
            String msg;
            new Thread(()->{
                    try{
                        while (true){
                            byte[] arr=new byte[1024];
                            DatagramPacket packet=new DatagramPacket(arr,arr.length);
                            socket.receive(packet);
                            String m1=new String(packet.getData(),"utf-8");
                            System.out.println("受到服务器消息:"+m1);
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }
            }).start();
            while (!(msg=scanner.nextLine()).equals("exit")){
                DatagramPacket packet=new DatagramPacket(msg.getBytes(),msg.getBytes().length,new InetSocketAddress("127.0.0.1",8889));
                socket.send(packet);
            }
        }finally {
            socket.close();
        }
    }
}

JAVA NIO udp 实现 群转发

标签:selector   rgs   java nio   oca   nts   exe   current   stat   system   

原文地址:https://www.cnblogs.com/duangL/p/11732168.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!