标签:selector rgs java nio oca nts exe current stat system
场景很简单,就是多个客户端通过udp,连接到服务器(其实是无连接的,就是服务器保存了客户端的ip信息)。然后通过udp协议先服务器发送消息,然后服务器在通过udp转发在各个客服端。
这个是不是 观察者模式
server代码
public class Server {
private static LinkedList<SocketAddress> list=new LinkedList<SocketAddress>();
private static final ExecutorService executorService = Executors.newFixedThreadPool(4);//这里用多线程去转发,可以提高效率。emmm,udp没有3次握手,速度应该很快,多不多线程应该差不了太多
private static DatagramChannel server=null;
private static Selector selector=null;
static {
try{
server=DatagramChannel.open().bind(new InetSocketAddress(8889));
server.configureBlocking(false);
selector=Selector.open();
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args)throws Exception {
server.register(selector, SelectionKey.OP_READ);
while (true){
if (selector.select()>0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()){
String msg = readMsg(selectionKey);
broadcast(msg);
}
iterator.remove();
}}
}
}
public static void saveIP(SocketAddress address)throws Exception{
if (!list.contains(address)){
list.add(address);
System.out.println("新增ip:"+address);
}
System.out.println("当前udp 保存的ip数量:"+list.size());
}
public static void broadcast(String msg) throws Exception{
//DatagramPacket packet=new DatagramPacket(msg.getBytes(),0,msg.getBytes().length);
for (SocketAddress address:list
) {
ByteBuffer byteBuffer=ByteBuffer.wrap(msg.getBytes());
executorService.submit(new Runnable() {
@Override
public void run() {
try{
server.send(byteBuffer, address);
System.out.println(Thread.currentThread().getName()+":"+address+"发送成功");
}catch (Exception e){
System.out.println(Thread.currentThread().getName()+":"+address+"发送失败");
}
}
});
}
}
public static String readMsg(SelectionKey selectionKey)throws Exception{
DatagramChannel channel=(DatagramChannel)selectionKey.channel();
System.out.println(channel.getRemoteAddress()+":"+channel.getLocalAddress());
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
SocketAddress receive = channel.receive(byteBuffer);
saveIP(receive);
byteBuffer.flip();
String msg=new String(byteBuffer.array(),"utf-8");
System.out.println("收到消息:"+msg);
return msg;
}
}
client 代码
public class Client {
public static DatagramSocket socket=null;
public static void main(String[] args) throws Exception{
try {
socket=new DatagramSocket();
Scanner scanner=new Scanner(System.in);
String msg;
new Thread(()->{
try{
while (true){
byte[] arr=new byte[1024];
DatagramPacket packet=new DatagramPacket(arr,arr.length);
socket.receive(packet);
String m1=new String(packet.getData(),"utf-8");
System.out.println("受到服务器消息:"+m1);
}
}catch (Exception e){
e.printStackTrace();
}
}).start();
while (!(msg=scanner.nextLine()).equals("exit")){
DatagramPacket packet=new DatagramPacket(msg.getBytes(),msg.getBytes().length,new InetSocketAddress("127.0.0.1",8889));
socket.send(packet);
}
}finally {
socket.close();
}
}
}
标签:selector rgs java nio oca nts exe current stat system
原文地址:https://www.cnblogs.com/duangL/p/11732168.html