码迷,mamicode.com
首页 > 其他好文 > 详细

SocketChannel 例子(转)

时间:2015-12-14 18:30:37      阅读:120      评论:0      收藏:0      [点我收藏+]

标签:

Socket通信比较常见的问题有如下几种: 
1、设置收发超时; 
2、正确的每一个bit的收发; 
3、物理线路故障的保护; 
4、始终能正常工作; 
5、尽量少占系统资源; 
n、…… 
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是:通信 
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。 
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话…… 
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动) 

Java代码 
  1. // Copyright (c) 2005 Brian Wellington (bwelling@xbill.org)  
  2.   
  3. package asynchronizedchannel;  
  4.   
  5. import java.io.*;  
  6. import java.net.*;  
  7. import java.nio.*;  
  8. import java.nio.channels.*;  
  9.   
  10. final class TcpChannel  
  11. {  
  12.     private long endTime;  
  13.     private SelectionKey key;  
  14.   
  15.     public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException  
  16.     {  
  17.         boolean done = false;  
  18.         Selector selector = null;  
  19.         this.endTime = endTime;  
  20.         try {  
  21.             selector = Selector.open();  
  22.             channel.configureBlocking(false);  
  23.             key = channel.register(selector, op);  
  24.             done = true;  
  25.         } finally {  
  26.             if (!done && selector != null) {  
  27.                 selector.close();  
  28.             }  
  29.             if (!done) {  
  30.                 channel.close();  
  31.             }  
  32.         }  
  33.     }  
  34.   
  35.     static void blockUntil(SelectionKey key, long endTime) throws IOException  
  36.     {  
  37.         long timeout = endTime - System.currentTimeMillis();  
  38.         int nkeys = 0;  
  39.         if (timeout > 0) {  
  40.             nkeys = key.selector().select(timeout);  
  41.         } else if (timeout == 0) {  
  42.             nkeys = key.selector().selectNow();  
  43.         }  
  44.         if (nkeys == 0) {  
  45.             throw new SocketTimeoutException();  
  46.         }  
  47.     }  
  48.   
  49.     void cleanup()  
  50.     {  
  51.         try {  
  52.             key.selector().close();  
  53.             key.channel().close();  
  54.         } catch (IOException ex) {  
  55.             ex.printStackTrace();  
  56.         }  
  57.     }  
  58.   
  59.     void bind(SocketAddress addr) throws IOException  
  60.     {  
  61.         SocketChannel channel = (SocketChannel) key.channel();  
  62.         channel.socket().bind(addr);  
  63.     }  
  64.   
  65.     void connect(SocketAddress addr) throws IOException  
  66.     {  
  67.         SocketChannel channel = (SocketChannel) key.channel();  
  68.         if (channel.connect(addr))  
  69.             return;  
  70.         key.interestOps(SelectionKey.OP_CONNECT);  
  71.         try {  
  72.             while (!channel.finishConnect()) {  
  73.                 if (!key.isConnectable()) {  
  74.                     blockUntil(key, endTime);  
  75.                 }  
  76.             }  
  77.         } finally {  
  78.             if (key.isValid()) {  
  79.                 key.interestOps(0);  
  80.             }  
  81.         }  
  82.     }  
  83.   
  84.     void send(ByteBuffer buffer) throws IOException  
  85.     {  
  86.         Send.operate(key, buffer, endTime);  
  87.     }  
  88.   
  89.     void recv(ByteBuffer buffer) throws IOException  
  90.     {  
  91.         Recv.operate(key, buffer, endTime);  
  92.     }  
  93. }  
  94.   
  95. interface Operator  
  96. {  
  97.     class Operation  
  98.     {  
  99.         static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException  
  100.         {  
  101.             final SocketChannel channel = (SocketChannel) key.channel();  
  102.             final int total = buffer.capacity();  
  103.             key.interestOps(op);  
  104.             try {  
  105.                 while (buffer.position() < total) {  
  106.                     if (System.currentTimeMillis() > endTime) {  
  107.                         throw new SocketTimeoutException();  
  108.                     }  
  109.                     if ((key.readyOps() & op) != 0) {  
  110.                         if (optr.io(channel, buffer) < 0) {  
  111.                             throw new EOFException();  
  112.                         }  
  113.                     } else {  
  114.                         TcpChannel.blockUntil(key, endTime);  
  115.                     }  
  116.                 }  
  117.             } finally {  
  118.                 if (key.isValid()) {  
  119.                     key.interestOps(0);  
  120.                 }  
  121.             }  
  122.         }  
  123.     }  
  124.   
  125.     int io(SocketChannel channel, ByteBuffer buffer) throws IOException;  
  126. }  
  127. class Send implements Operator  
  128. {  
  129.     public int io(SocketChannel channel, ByteBuffer buffer) throws IOException  
  130.     {  
  131.         return channel.write(buffer);  
  132.     }  
  133.     public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException  
  134.     {  
  135.         Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);  
  136.     }  
  137.     public static final Send operator = new Send();  
  138. }  
  139.   
  140. class Recv implements Operator  
  141. {  
  142.     public int io(SocketChannel channel, ByteBuffer buffer) throws IOException  
  143.     {  
  144.         return channel.read(buffer);  
  145.     }  
  146.       
  147.     public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException  
  148.     {  
  149.         Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);  
  150.     }  
  151.     public static final Recv operator = new Recv();  
  152. }  


使用演示见以下代码。 
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。 
正式应用中可以再设置tryout尝试n次。 
Server端,代码演示: 

Java代码 
  1. package asynchronizedchannel;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetSocketAddress;  
  5. import java.nio.ByteBuffer;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.Selector;  
  8. import java.nio.channels.ServerSocketChannel;  
  9. import java.nio.channels.SocketChannel;  
  10. import java.security.MessageDigest;  
  11. import java.util.Iterator;  
  12.   
  13. public class Server  
  14. {  
  15.   
  16.     /** 
  17.      * 服务端通信范例程序主函数 
  18.      *  
  19.      * @param args 
  20.      * @throws IOException 
  21.      */  
  22.     public static void main(String[] args) throws IOException  
  23.     {  
  24.         // Create the selector  
  25.         final Selector selector = Selector.open();  
  26.         final ServerSocketChannel server = ServerSocketChannel.open();  
  27.         server.configureBlocking(false);  
  28.         server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5);  
  29.         // Register both channels with selector  
  30.         server.register(selector, SelectionKey.OP_ACCEPT);  
  31.         new Thread(new Daemon(selector)).start();  
  32.     }  
  33. }  
  34.   
  35. class Daemon implements Runnable  
  36. {  
  37.     private final Selector selector;  
  38.   
  39.     Daemon(Selector selector)  
  40.     {  
  41.         this.selector = selector;  
  42.     }  
  43.   
  44.     public void run()  
  45.     {  
  46.         while (true) {  
  47.             try {  
  48.                 // Wait for an event  
  49.                 selector.select();  
  50.   
  51.                 // Get list of selection keys with pending events  
  52.                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  53.   
  54.                 // Process each key  
  55.                 while (it.hasNext()) {  
  56.                     // Get the selection key  
  57.                     SelectionKey selKey = it.next();  
  58.   
  59.                     // Remove it from the list to indicate that it is being processed  
  60.                     it.remove();  
  61.   
  62.                     // Check if it‘s a connection request  
  63.                     if (selKey.isAcceptable()) {  
  64.                         // Get channel with connection request  
  65.                         ServerSocketChannel server = (ServerSocketChannel) selKey.channel();  
  66.                         // Accept the connection request.  
  67.                         // If serverSocketChannel is blocking, this method blocks.  
  68.                         // The returned channel is in blocking mode.  
  69.                         SocketChannel channel = server.accept();  
  70.   
  71.                         // If serverSocketChannel is non-blocking, sChannel may be null  
  72.                         if (channel != null) {  
  73.                             // Use the socket channel to communicate with the client  
  74.                             new Thread(new ServerHandler(channel)).start();  
  75.                         } else {  
  76.                             System.out.println("---No Connection---");  
  77.                             // There were no pending connection requests; try again later.  
  78.                             // To be notified of connection requests,  
  79.                         }  
  80.                     }  
  81.                 }  
  82.             } catch (Exception ex) {  
  83.                 ex.printStackTrace();  
  84.             }  
  85.         }  
  86.     }  
  87. }  
  88.   
  89. class ServerHandler implements Runnable  
  90. {  
  91.     private static final long timeout = 30 * 1000; // 设置超时时间为30秒  
  92.     private static int counter = 0;  
  93.     private final TcpChannel channel;  
  94.     private final MessageDigest md;  
  95.   
  96.     ServerHandler(SocketChannel channel) throws Exception  
  97.     {  
  98.         this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);  
  99.         md = MessageDigest.getInstance("md5");  
  100.     }  
  101.   
  102.     public void run()  
  103.     {  
  104.         try {  
  105.             while (true) {  
  106.                 work();  
  107.                 synchronized (ServerHandler.class) {  
  108.                     if ((++counter & 65535) == 0) {  
  109.                         System.out.println(counter);  
  110.                     }  
  111.                 }  
  112.             }  
  113.         } catch (Exception e) {  
  114.             e.printStackTrace();  
  115.         } finally {  
  116.             channel.cleanup();  
  117.         }  
  118.     }  
  119.   
  120.     private void work() throws IOException  
  121.     { // 模拟工作流程  
  122.         byte[] cache = new byte[256], reply = new byte[5];  
  123.         read(cache, reply);  
  124.     }  
  125.   
  126.     private void read(byte[] cache, byte[] reply) throws IOException  
  127.     { // 从套接字读入数据  
  128.         channel.recv(ByteBuffer.wrap(cache));  
  129.         md.reset();  
  130.         md.update(cache, 0, 240);  
  131.         byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码  
  132.         if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较  
  133.             reply[0] = ‘?‘;  
  134.             System.out.println("MISMATCH!");  
  135.         } else {  
  136.             reply[0] = ‘.‘;  
  137.         }  
  138.         channel.send(ByteBuffer.wrap(reply)); // 返回接收结果  
  139.     }  
  140. }  
  141.   
  142. final class ExtArrays  
  143. {  
  144.     private ExtArrays()  
  145.     {  
  146.     }  
  147.   
  148.     public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)  
  149.     { // 字节数组的部分比较  
  150.         if (a == null || b == null) {  
  151.             return false;  
  152.         }  
  153.         if (offset_a + len > a.length || offset_b + len > b.length) {  
  154.             return false;  
  155.         }  
  156.         for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) {  
  157.             if (a[i] != b[j]) {  
  158.                 return false;  
  159.             }  
  160.         }  
  161.         return true;  
  162.     }  
  163. }  


Client端,代码演示: 

Java代码 
  1. package asynchronizedchannel;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetSocketAddress;  
  5. import java.nio.ByteBuffer;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.SocketChannel;  
  8. import java.security.DigestException;  
  9. import java.security.MessageDigest;  
  10. import java.util.Random;  
  11.   
  12. public class Client  
  13. {  
  14.     private static int id = 0;  
  15.     /** 
  16.      * 客户端通信范例程序主函数 
  17.      *  
  18.      * @param args 
  19.      * @throws Exception 
  20.      */  
  21.     public static void main(String[] args) throws Exception  
  22.     {  
  23.         new Thread(new ClientHandler(id++)).start();  
  24.         new Thread(new ClientHandler(id++)).start();  
  25.         new Thread(new ClientHandler(id++)).start();  
  26.         new Thread(new ClientHandler(id++)).start();  
  27.         new Thread(new ClientHandler(id++)).start();  
  28.     }  
  29.   
  30. }  
  31.   
  32. class ClientHandler implements Runnable  
  33. {  
  34.     private static final long timeout = 30 * 1000; // 设置超时时间为30秒  
  35.     private final TcpChannel channel;  
  36.       
  37.     private final int id;  
  38.   
  39.     private final MessageDigest md;  
  40.     private final Random rand;  
  41.   
  42.     ClientHandler(int id) throws Exception  
  43.     {  
  44.         this.id = id;  
  45.         channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);  
  46.         md = MessageDigest.getInstance("md5");  
  47.         rand = new Random();  
  48.     }  
  49.   
  50.     @Override  
  51.     public void run()  
  52.     {  
  53.         try {  
  54.             channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656));  
  55.             int i = 0;  
  56.             while (true) {  
  57.                 work();  
  58.                 if ((++i & 16383) == 0) {  
  59.                     System.out.println(String.format("client(%1$d): %2$d", id, i));  
  60.                 }  
  61.                 Thread.yield();  
  62.             }  
  63.         } catch (Exception e) {  
  64.             e.printStackTrace();  
  65.         } finally {  
  66.             channel.cleanup();  
  67.         }  
  68.     }  
  69.   
  70.     private void work() throws IOException, DigestException  
  71.     {  
  72.         byte[] cache = new byte[256], reply = new byte[5];  
  73.         write(cache, reply);  
  74.     }  
  75.   
  76.     private void write(byte[] cache, byte[] reply) throws DigestException, IOException  
  77.     {  
  78.         rand.nextBytes(cache); // 只用前面的240字节  
  79.         md.reset();  
  80.         md.update(cache, 0, 240);  
  81.         md.digest(cache, 240, 16); // MD5校验码占后面16字节  
  82.         ByteBuffer buffer = ByteBuffer.wrap(cache);  
  83.         channel.send(buffer);  
  84.         buffer = ByteBuffer.wrap(reply);  
  85.         channel.recv(buffer);  
  86.         if (reply[0] != ‘.‘) { // 若接收的结果不正确,可以考虑尝试再次发送  
  87.             System.out.println("MISMATCH!");  
  88.         }  
  89.     }  
  90. }  


重点说明: 

发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。

SocketChannel 例子(转)

标签:

原文地址:http://www.cnblogs.com/yucongblog/p/5045671.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!